You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/11 12:02:47 UTC
flink git commit: [FLINK-5679] [tests] Refactor
PartitionedStateCheckpointingITCase
Repository: flink
Updated Branches:
refs/heads/release-1.3 b412e530d -> 52d069504
[FLINK-5679] [tests] Refactor PartitionedStateCheckpointingITCase
- Massively speeds up the test by using fewer test elements and better coordination
of source throttling and checkpointing.
- Makes the test compatible with Windows by using proper URI encoding
- Drops use of deprecated ValueState constructors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52d06950
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52d06950
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52d06950
Branch: refs/heads/release-1.3
Commit: 52d069504b0360fd8a4f0977bbb0c4bc84fd8c01
Parents: b412e53
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 11 10:55:22 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 11 12:44:51 2017 +0200
----------------------------------------------------------------------
.../KeyedStateCheckpointingITCase.java | 401 +++++++++++++++++++
.../PartitionedStateCheckpointingITCase.java | 308 --------------
2 files changed, 401 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/52d06950/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
new file mode 100644
index 0000000..147d385
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after
+ * completion, the state reflects the "exactly once" semantics.
+ *
+ * It is designed to check partitioned states.
+ */
+@SuppressWarnings("serial")
+public class KeyedStateCheckpointingITCase extends TestLogger {
+
+ protected static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
+
+ protected static final int NUM_STRINGS = 10_000;
+ protected static final int NUM_KEYS = 40;
+
+ protected static final int NUM_TASK_MANAGERS = 2;
+ protected static final int NUM_TASK_SLOTS = 2;
+ protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+ // ------------------------------------------------------------------------
+
+ private static LocalFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void startCluster() throws Exception {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+
+ cluster = new LocalFlinkMiniCluster(config, false);
+ cluster.start();
+ }
+
+ @AfterClass
+ public static void stopCluster() throws Exception{
+ if (cluster != null) {
+ cluster.stop();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testWithMemoryBackendSync() throws Exception {
+ MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
+ testProgramWithBackend(syncMemBackend);
+ }
+
+ @Test
+ public void testWithMemoryBackendAsync() throws Exception {
+ MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+ testProgramWithBackend(asyncMemBackend);
+ }
+
+ @Test
+ public void testWithFsBackendSync() throws Exception {
+ FsStateBackend syncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), false);
+ testProgramWithBackend(syncFsBackend);
+ }
+
+ @Test
+ public void testWithFsBackendAsync() throws Exception {
+ FsStateBackend asyncFsBackend = new FsStateBackend(tmpFolder.newFolder().toURI().toString(), true);
+ testProgramWithBackend(asyncFsBackend);
+ }
+
+ @Test
+ public void testWithRocksDbBackendFull() throws Exception {
+ RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
+ fullRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
+
+ testProgramWithBackend(fullRocksDbBackend);
+ }
+
+ @Test
+ public void testWithRocksDbBackendIncremental() throws Exception {
+ RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+ incRocksDbBackend.setDbStoragePath(tmpFolder.newFolder().getAbsolutePath());
+
+ testProgramWithBackend(incRocksDbBackend);
+ }
+
+ // ------------------------------------------------------------------------
+
+ protected void testProgramWithBackend(AbstractStateBackend stateBackend) throws Exception {
+ assertEquals("Broken test setup", 0, (NUM_STRINGS / 2) % NUM_KEYS);
+
+ final StreamExecutionEnvironment env = new TestStreamEnvironment(cluster, PARALLELISM);
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(500);
+ env.getConfig().disableSysoutLogging();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
+
+ env.setStateBackend(stateBackend);
+
+ // compute when (randomly) the failure should happen
+ final int failurePosMin = (int) (0.6 * NUM_STRINGS / PARALLELISM);
+ final int failurePosMax = (int) (0.8 * NUM_STRINGS / PARALLELISM);
+ final int failurePos = (new Random().nextInt(failurePosMax - failurePosMin) + failurePosMin);
+
+ final DataStream<Integer> stream1 = env.addSource(
+ new IntGeneratingSourceFunction(NUM_STRINGS / 2, NUM_STRINGS / 4));
+
+ final DataStream<Integer> stream2 = env.addSource(
+ new IntGeneratingSourceFunction(NUM_STRINGS / 2, NUM_STRINGS / 4));
+
+ stream1.union(stream2)
+ .keyBy(new IdentityKeySelector<Integer>())
+ .map(new OnceFailingPartitionedSum(failurePos))
+ .keyBy(0)
+ .addSink(new CounterSink());
+
+ env.execute();
+
+ // verify that we counted exactly right
+ assertEquals(NUM_KEYS, CounterSink.ALL_COUNTS.size());
+ assertEquals(NUM_KEYS, OnceFailingPartitionedSum.ALL_SUMS.size());
+
+ for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.ALL_SUMS.entrySet()) {
+ assertEquals((long) sum.getKey() * NUM_STRINGS / NUM_KEYS, sum.getValue().longValue());
+ }
+ for (long count : CounterSink.ALL_COUNTS.values()) {
+ assertEquals(NUM_STRINGS / NUM_KEYS, count);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Functions
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A source that generates a sequence of integers and throttles down until a checkpoint
+ * has happened.
+ */
+ private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
+ implements ListCheckpointed<Integer>, CheckpointListener {
+
+ private final int numElements;
+ private final int checkpointLatestAt;
+
+ private int lastEmitted = -1;
+
+ private boolean checkpointHappened;
+
+ private volatile boolean isRunning = true;
+
+ IntGeneratingSourceFunction(int numElements, int checkpointLatestAt) {
+ this.numElements = numElements;
+ this.checkpointLatestAt = checkpointLatestAt;
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ int nextElement = lastEmitted >= 0 ? lastEmitted + step :
+ getRuntimeContext().getIndexOfThisSubtask();
+
+ while (isRunning && nextElement < numElements) {
+
+ // throttle / block if we are still waiting for the checkpoint
+ if (!checkpointHappened) {
+ if (nextElement < checkpointLatestAt) {
+ // only throttle
+ Thread.sleep(1);
+ } else {
+ // hard block
+ synchronized (this) {
+ while (!checkpointHappened) {
+ this.wait();
+ }
+ }
+ }
+ }
+
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lockingObject) {
+ ctx.collect(nextElement % NUM_KEYS);
+ lastEmitted = nextElement;
+ }
+
+ nextElement += step;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(lastEmitted);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ assertEquals("Test failed due to unexpected recovered state size", 1, state.size());
+ lastEmitted = state.get(0);
+ checkpointHappened = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ synchronized (this) {
+ checkpointHappened = true;
+ this.notifyAll();
+ }
+ }
+ }
+
+ private static class OnceFailingPartitionedSum
+ extends RichMapFunction<Integer, Tuple2<Integer, Long>>
+ implements ListCheckpointed<Integer> {
+
+ private static final Map<Integer, Long> ALL_SUMS = new ConcurrentHashMap<>();
+
+ private final int failurePos;
+ private int count;
+
+ private boolean shouldFail = true;
+
+ private transient ValueState<Long> sum;
+
+ OnceFailingPartitionedSum(int failurePos) {
+ this.failurePos = failurePos;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ sum = getRuntimeContext().getState(new ValueStateDescriptor<>("my_state", Long.class));
+ }
+
+ @Override
+ public Tuple2<Integer, Long> map(Integer value) throws Exception {
+ if (shouldFail && count++ >= failurePos) {
+ shouldFail = false;
+ throw new Exception("Test Failure");
+ }
+
+ Long oldSum = sum.value();
+ long currentSum = (oldSum == null ? 0L : oldSum) + value;
+
+ sum.update(currentSum);
+ ALL_SUMS.put(value, currentSum);
+ return new Tuple2<>(value, currentSum);
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(count);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ assertEquals("Test failed due to unexpected recovered state size", 1, state.size());
+ count = state.get(0);
+ shouldFail = false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (shouldFail) {
+ fail("Test ineffective: Function cleanly finished without ever failing.");
+ }
+ }
+ }
+
+ private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
+
+ private static final Map<Integer, Long> ALL_COUNTS = new ConcurrentHashMap<>();
+
+ private transient ValueState<NonSerializableLong> aCounts;
+ private transient ValueState<Long> bCounts;
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ aCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("a", NonSerializableLong.class));
+ bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class));
+ }
+
+ @Override
+ public void invoke(Tuple2<Integer, Long> value) throws Exception {
+ final NonSerializableLong acRaw = aCounts.value();
+ final Long bcRaw = bCounts.value();
+
+ final long ac = acRaw == null ? 0L : acRaw.value;
+ final long bc = bcRaw == null ? 0L : bcRaw;
+
+ assertEquals(ac, bc);
+
+ long currentCount = ac + 1;
+ aCounts.update(NonSerializableLong.of(currentCount));
+ bCounts.update(currentCount);
+
+ ALL_COUNTS.put(value.f0, currentCount);
+ }
+ }
+
+ public static class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // data types
+ // ------------------------------------------------------------------------
+
+ public static class NonSerializableLong {
+
+ public long value;
+
+ private NonSerializableLong(long value) {
+ this.value = value;
+ }
+
+ public static NonSerializableLong of(long value) {
+ return new NonSerializableLong(value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj ||
+ obj != null && obj.getClass() == getClass() && ((NonSerializableLong) obj).value == this.value;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (value ^ (value >>> 32));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/52d06950/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
deleted file mode 100644
index 517c82b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after
- * completion, the state reflects the "exactly once" semantics.
- *
- * It is designed to check partitioned states.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase {
-
- private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
-
- final long NUM_STRINGS = 10_000_000L;
- final static int NUM_KEYS = 40;
-
- @Parameterized.Parameters
- public static Collection<AbstractStateBackend> parameters() throws IOException {
- TemporaryFolder tempFolder = new TemporaryFolder();
- tempFolder.create();
-
- MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
- MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
-
- FsStateBackend syncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), false);
- FsStateBackend asyncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), true);
-
- RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false);
- fullRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
- RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
- incRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath());
-
- return Arrays.asList(
- syncMemBackend,
- asyncMemBackend,
- syncFsBackend,
- asyncFsBackend,
- fullRocksDbBackend,
- incRocksDbBackend);
- }
-
- @Parameterized.Parameter
- public AbstractStateBackend stateBackend;
-
- @Override
- public void testProgram(StreamExecutionEnvironment env) {
- assertTrue("Broken test setup", (NUM_STRINGS/2) % NUM_KEYS == 0);
-
- env.setStateBackend(stateBackend);
-
- DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
- DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
-
- stream1.union(stream2)
- .keyBy(new IdentityKeySelector<Integer>())
- .map(new OnceFailingPartitionedSum(NUM_STRINGS))
- .keyBy(0)
- .addSink(new CounterSink());
- }
-
- @Override
- public void postSubmit() {
- // verify that we counted exactly right
- for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
- assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue());
- }
- for (Long count : CounterSink.allCounts.values()) {
- assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count);
- }
-
- assertEquals(NUM_KEYS, CounterSink.allCounts.size());
- assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size());
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom Functions
- // --------------------------------------------------------------------------------------------
-
- private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
- implements ListCheckpointed<Integer> {
-
- private final long numElements;
-
- private int index;
- private int step;
-
- private volatile boolean isRunning = true;
-
- static final long[] counts = new long[PARALLELISM];
-
- @Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
- }
-
- IntGeneratingSourceFunction(long numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void open(Configuration parameters) throws IOException {
- step = getRuntimeContext().getNumberOfParallelSubtasks();
- if (index == 0) {
- index = getRuntimeContext().getIndexOfThisSubtask();
- }
- }
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- final Object lockingObject = ctx.getCheckpointLock();
-
- while (isRunning && index < numElements) {
-
- synchronized (lockingObject) {
- index += step;
- ctx.collect(index % NUM_KEYS);
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.index);
- }
-
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.index = state.get(0);
- }
- }
-
- private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {
-
- private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
-
- private static volatile boolean hasFailed = false;
-
- private final long numElements;
-
- private long failurePos;
- private long count;
-
- private ValueState<Long> sum;
-
- OnceFailingPartitionedSum(long numElements) {
- this.numElements = numElements;
- this.hasFailed = false;
- }
-
- @Override
- public void open(Configuration parameters) throws IOException {
- long failurePosMin = (long) (0.6 * numElements / getRuntimeContext()
- .getNumberOfParallelSubtasks());
- long failurePosMax = (long) (0.8 * numElements / getRuntimeContext()
- .getNumberOfParallelSubtasks());
-
- failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
- count = 0;
- sum = getRuntimeContext().getState(
- new ValueStateDescriptor<>("my_state", Long.class, 0L));
- }
-
- @Override
- public Tuple2<Integer, Long> map(Integer value) throws Exception {
- count++;
-
- if (!hasFailed && count >= failurePos) {
- hasFailed = true;
- throw new Exception("Test Failure");
- }
-
- long currentSum = sum.value() + value;
- sum.update(currentSum);
- allSums.put(value, currentSum);
- return new Tuple2<Integer, Long>(value, currentSum);
- }
- }
-
- private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
-
- private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
-
- private ValueState<NonSerializableLong> aCounts;
- private ValueState<Long> bCounts;
-
- @Override
- public void open(Configuration parameters) throws IOException {
-
- aCounts = getRuntimeContext().getState(
- new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L)));
-
- bCounts = getRuntimeContext().getState(
- new ValueStateDescriptor<>("b", Long.class, 0L));
- }
-
- @Override
- public void invoke(Tuple2<Integer, Long> value) throws Exception {
- long ac = aCounts.value().value;
- long bc = bCounts.value();
- assertEquals(ac, bc);
-
- long currentCount = ac + 1;
- aCounts.update(NonSerializableLong.of(currentCount));
- bCounts.update(currentCount);
-
- allCounts.put(value.f0, currentCount);
- }
- }
-
- public static class NonSerializableLong {
- public Long value;
-
- private NonSerializableLong(long value) {
- this.value = value;
- }
-
- public static NonSerializableLong of(long value) {
- return new NonSerializableLong(value);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- NonSerializableLong that = (NonSerializableLong) o;
-
- return value.equals(that.value);
-
- }
-
- @Override
- public int hashCode() {
- return value.hashCode();
- }
- }
-
- public static class IdentityKeySelector<T> implements KeySelector<T, T> {
-
- @Override
- public T getKey(T value) throws Exception {
- return value;
- }
-
- }
-}