You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:21 UTC
[21/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index bda1679..fd4ecd4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -51,13 +52,14 @@ import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
*
- * <p>
- * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
+ * <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@@ -68,7 +70,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
private static TestStreamEnvironment env;
-
@BeforeClass
public static void startTestCluster() {
Configuration config = new Configuration();
@@ -94,11 +95,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@Test
public void testTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 1;
+ final int numElementsPerKey = 3000;
+ final int windowSize = 100;
+ final int numKeys = 1;
FailingSource.reset();
-
+
try {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -107,11 +108,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_KEYS,
- NUM_ELEMENTS_PER_KEY,
- NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys,
+ numElementsPerKey,
+ numElementsPerKey / 3))
.rebalance()
- .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindowAll(Time.of(windowSize, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
private boolean open = false;
@@ -141,8 +142,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -154,10 +154,10 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@Test
public void testSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 1;
+ final int numElementsPerKey = 3000;
+ final int windowSize = 1000;
+ final int windowSlide = 100;
+ final int numKeys = 1;
FailingSource.reset();
try {
@@ -168,9 +168,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
.rebalance()
- .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
.apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
private boolean open = false;
@@ -200,8 +200,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
tryExecute(env, "Sliding Window Test");
}
@@ -213,9 +212,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@Test
public void testPreAggregatedTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 1;
+ final int numElementsPerKey = 3000;
+ final int windowSize = 100;
+ final int numKeys = 1;
FailingSource.reset();
try {
@@ -226,11 +225,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_KEYS,
- NUM_ELEMENTS_PER_KEY,
- NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys,
+ numElementsPerKey,
+ numElementsPerKey / 3))
.rebalance()
- .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindowAll(Time.of(windowSize, MILLISECONDS))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@@ -269,8 +268,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -282,9 +280,9 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@Test
public void testPreAggregatedFoldingTumblingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 100;
- final int NUM_KEYS = 1;
+ final int numElementsPerKey = 3000;
+ final int windowSize = 100;
+ final int numKeys = 1;
FailingSource.reset();
try {
@@ -295,11 +293,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_KEYS,
- NUM_ELEMENTS_PER_KEY,
- NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys,
+ numElementsPerKey,
+ numElementsPerKey / 3))
.rebalance()
- .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS))
+ .timeWindowAll(Time.of(windowSize, MILLISECONDS))
.fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)),
new FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
@Override
@@ -337,8 +335,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -350,10 +347,10 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
@Test
public void testPreAggregatedSlidingTimeWindow() {
- final int NUM_ELEMENTS_PER_KEY = 3000;
- final int WINDOW_SIZE = 1000;
- final int WINDOW_SLIDE = 100;
- final int NUM_KEYS = 1;
+ final int numElementsPerKey = 3000;
+ final int windowSize = 1000;
+ final int windowSlide = 100;
+ final int numKeys = 1;
FailingSource.reset();
try {
@@ -364,12 +361,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_KEYS,
- NUM_ELEMENTS_PER_KEY,
- NUM_ELEMENTS_PER_KEY / 3))
+ .addSource(new FailingSource(numKeys,
+ numElementsPerKey,
+ numElementsPerKey / 3))
.rebalance()
- .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS),
- Time.of(WINDOW_SLIDE, MILLISECONDS))
+ .timeWindowAll(Time.of(windowSize, MILLISECONDS),
+ Time.of(windowSlide, MILLISECONDS))
.reduce(
new ReduceFunction<Tuple2<Long, IntType>>() {
@@ -408,8 +405,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -419,14 +415,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
}
-
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener
- {
+ implements ListCheckpointed<Integer>, CheckpointListener {
private static volatile boolean failedBefore = false;
private final int numKeys;
@@ -467,8 +461,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements))
- {
+ (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
// the function failed before, or we are in the elements before the failure
synchronized (ctx.getCheckpointLock()) {
int next = numElementsEmitted++;
@@ -579,7 +572,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
Integer curr = windowCounts.get(value.f0);
if (curr != null) {
windowCounts.put(value.f0, curr + 1);
@@ -625,12 +617,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
// Utilities
// ------------------------------------------------------------------------
+ /**
+ * Custom boxed integer type.
+ */
public static class IntType {
public int value;
public IntType() {}
- public IntType(int value) { this.value = value; }
+ public IntType(int value) {
+ this.value = value;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..030c1a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for file backend.
+ */
public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public FileBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..dfb66cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index 147d385..76a18c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -60,11 +60,11 @@ import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after
+ *
+ * <p>The test triggers a failure after a while and verifies that, after
* completion, the state reflects the "exactly once" semantics.
- *
- * It is designed to check partitioned states.
+ *
+ * <p>It is designed to check partitioned states.
*/
@SuppressWarnings("serial")
public class KeyedStateCheckpointingITCase extends TestLogger {
@@ -197,7 +197,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
* A source that generates a sequence of integers and throttles down until a checkpoint
* has happened.
*/
- private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
+ private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
implements ListCheckpointed<Integer>, CheckpointListener {
private final int numElements;
@@ -363,7 +363,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
}
}
- public static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
@Override
public T getKey(T value) throws Exception {
@@ -375,6 +375,9 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
// data types
// ------------------------------------------------------------------------
+ /**
+ * Custom boxed long type that does not implement Serializable.
+ */
public static class NonSerializableLong {
public long value;
@@ -389,7 +392,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
@Override
public boolean equals(Object obj) {
- return this == obj ||
+ return this == obj ||
obj != null && obj.getClass() == getClass() && ((NonSerializableLong) obj).value == this.value;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..54a29ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for memory backend.
+ */
public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public MemBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 264b22e..cad6693 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -55,7 +55,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
-import io.netty.util.internal.ConcurrentSet;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
@@ -85,6 +84,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Test savepoint rescaling.
+ */
@RunWith(Parameterized.class)
public class RescalingITCase extends TestLogger {
@@ -250,7 +252,6 @@ public class RescalingITCase extends TestLogger {
assertEquals(expectedResult2, actualResult2);
-
} finally {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
@@ -502,7 +503,6 @@ public class RescalingITCase extends TestLogger {
testSavepointRescalingPartitionedOperatorState(true, OperatorCheckpointMethod.LIST_CHECKPOINTED);
}
-
/**
* Tests rescaling of partitioned operator state. More specific, we test the mechanism with {@link ListCheckpointed}
* as it subsumes {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}.
@@ -522,11 +522,11 @@ public class RescalingITCase extends TestLogger {
if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION ||
checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
- PartitionedStateSource.CHECK_CORRECT_SNAPSHOT = new int[counterSize];
- PartitionedStateSource.CHECK_CORRECT_RESTORE = new int[counterSize];
+ PartitionedStateSource.checkCorrectSnapshot = new int[counterSize];
+ PartitionedStateSource.checkCorrectRestore = new int[counterSize];
} else {
- PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT = new int[counterSize];
- PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE = new int[counterSize];
+ PartitionedStateSourceListCheckpointed.checkCorrectSnapshot = new int[counterSize];
+ PartitionedStateSourceListCheckpointed.checkCorrectRestore = new int[counterSize];
}
try {
@@ -584,29 +584,29 @@ public class RescalingITCase extends TestLogger {
int sumAct = 0;
if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION) {
- for (int c : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
+ for (int c : PartitionedStateSource.checkCorrectSnapshot) {
sumExp += c;
}
- for (int c : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
+ for (int c : PartitionedStateSource.checkCorrectRestore) {
sumAct += c;
}
} else if (checkpointMethod == OperatorCheckpointMethod.CHECKPOINTED_FUNCTION_BROADCAST) {
- for (int c : PartitionedStateSource.CHECK_CORRECT_SNAPSHOT) {
+ for (int c : PartitionedStateSource.checkCorrectSnapshot) {
sumExp += c;
}
- for (int c : PartitionedStateSource.CHECK_CORRECT_RESTORE) {
+ for (int c : PartitionedStateSource.checkCorrectRestore) {
sumAct += c;
}
sumExp *= parallelism2;
} else {
- for (int c : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_SNAPSHOT) {
+ for (int c : PartitionedStateSourceListCheckpointed.checkCorrectSnapshot) {
sumExp += c;
}
- for (int c : PartitionedStateSourceListCheckpointed.CHECK_CORRECT_RESTORE) {
+ for (int c : PartitionedStateSourceListCheckpointed.checkCorrectRestore) {
sumAct += c;
}
}
@@ -777,8 +777,8 @@ public class RescalingITCase extends TestLogger {
if (counter < numberElements) {
synchronized (lock) {
for (int value = subtaskIndex;
- value < numberKeys;
- value += getRuntimeContext().getNumberOfParallelSubtasks()) {
+ value < numberKeys;
+ value += getRuntimeContext().getNumberOfParallelSubtasks()) {
ctx.collect(value);
}
@@ -943,13 +943,13 @@ public class RescalingITCase extends TestLogger {
private static final long serialVersionUID = -4357864582992546L;
private static final int NUM_PARTITIONS = 7;
- private static int[] CHECK_CORRECT_SNAPSHOT;
- private static int[] CHECK_CORRECT_RESTORE;
+ private static int[] checkCorrectSnapshot;
+ private static int[] checkCorrectRestore;
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+ checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;
int div = counter / NUM_PARTITIONS;
int mod = counter % NUM_PARTITIONS;
@@ -971,7 +971,7 @@ public class RescalingITCase extends TestLogger {
for (Integer v : state) {
counter += v;
}
- CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+ checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
}
}
@@ -983,8 +983,8 @@ public class RescalingITCase extends TestLogger {
private transient ListState<Integer> counterPartitions;
private boolean broadcast;
- private static int[] CHECK_CORRECT_SNAPSHOT;
- private static int[] CHECK_CORRECT_RESTORE;
+ private static int[] checkCorrectSnapshot;
+ private static int[] checkCorrectRestore;
public PartitionedStateSource(boolean broadcast) {
this.broadcast = broadcast;
@@ -995,7 +995,7 @@ public class RescalingITCase extends TestLogger {
counterPartitions.clear();
- CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+ checkCorrectSnapshot[getRuntimeContext().getIndexOfThisSubtask()] = counter;
int div = counter / NUM_PARTITIONS;
int mod = counter % NUM_PARTITIONS;
@@ -1027,7 +1027,7 @@ public class RescalingITCase extends TestLogger {
for (int v : counterPartitions.get()) {
counter += v;
}
- CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
+ checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..3873aff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,6 +18,9 @@
package org.apache.flink.test.checkpointing;
+/**
+ * Integration tests for fully synchronous RocksDB backend.
+ */
public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
public RocksDbBackendEventTimeWindowCheckpointingITCase() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 09dfa99..a3d45dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -18,11 +18,6 @@
package org.apache.flink.test.checkpointing;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.MapFunction;
@@ -78,17 +73,18 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.FileNotFoundException;
@@ -103,6 +99,12 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -479,7 +481,7 @@ public class SavepointITCase extends TestLogger {
/**
* FLINK-5985
*
- * This test ensures we can restore from a savepoint under modifications to the job graph that only concern
+ * <p>This test ensures we can restore from a savepoint under modifications to the job graph that only concern
* stateless operators.
*/
@Test
@@ -736,17 +738,17 @@ public class SavepointITCase extends TestLogger {
}
private static final int ITER_TEST_PARALLELISM = 1;
- private static OneShotLatch[] ITER_TEST_SNAPSHOT_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
- private static OneShotLatch[] ITER_TEST_RESTORE_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
- private static int[] ITER_TEST_CHECKPOINT_VERIFY = new int[ITER_TEST_PARALLELISM];
+ private static OneShotLatch[] iterTestSnapshotWait = new OneShotLatch[ITER_TEST_PARALLELISM];
+ private static OneShotLatch[] iterTestRestoreWait = new OneShotLatch[ITER_TEST_PARALLELISM];
+ private static int[] iterTestCheckpointVerify = new int[ITER_TEST_PARALLELISM];
@Test
public void testSavepointForJobWithIteration() throws Exception {
for (int i = 0; i < ITER_TEST_PARALLELISM; ++i) {
- ITER_TEST_SNAPSHOT_WAIT[i] = new OneShotLatch();
- ITER_TEST_RESTORE_WAIT[i] = new OneShotLatch();
- ITER_TEST_CHECKPOINT_VERIFY[i] = 0;
+ iterTestSnapshotWait[i] = new OneShotLatch();
+ iterTestRestoreWait[i] = new OneShotLatch();
+ iterTestCheckpointVerify[i] = 0;
}
TemporaryFolder folder = new TemporaryFolder();
@@ -821,7 +823,7 @@ public class SavepointITCase extends TestLogger {
cluster.start();
cluster.submitJobDetached(jobGraph);
- for (OneShotLatch latch : ITER_TEST_SNAPSHOT_WAIT) {
+ for (OneShotLatch latch : iterTestSnapshotWait) {
latch.await();
}
savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
@@ -831,7 +833,7 @@ public class SavepointITCase extends TestLogger {
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
cluster.submitJobDetached(jobGraph);
- for (OneShotLatch latch : ITER_TEST_RESTORE_WAIT) {
+ for (OneShotLatch latch : iterTestRestoreWait) {
latch.await();
}
source.cancel();
@@ -883,7 +885,7 @@ public class SavepointITCase extends TestLogger {
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
+ iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()] = emittedCount;
return Collections.singletonList(emittedCount);
}
@@ -892,20 +894,20 @@ public class SavepointITCase extends TestLogger {
if (!state.isEmpty()) {
this.emittedCount = state.get(0);
}
- Assert.assertEquals(ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()], emittedCount);
- ITER_TEST_RESTORE_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+ Assert.assertEquals(iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()], emittedCount);
+ iterTestRestoreWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
}
}
- public static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
+ private static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
- static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
+ static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class, false);
private static final long serialVersionUID = 1L;
private ValueState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
- operatorState = this.getRuntimeContext().getState(descriptor);
+ operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
}
@Override
@@ -916,7 +918,7 @@ public class SavepointITCase extends TestLogger {
}
if (30 == value) {
- ITER_TEST_SNAPSHOT_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
+ iterTestSnapshotWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 32d9e23..0fcfb8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +47,11 @@ import static org.junit.Assert.assertTrue;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
*
- * The test triggers a failure after a while and verifies that, after completion, the
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
* state defined with either the {@link ValueState} or the {@link ListCheckpointed}
* interface reflects the "exactly once" semantics.
- *
- * The test throttles the input until at least two checkpoints are completed, to make sure that
+ *
+ * <p>The test throttles the input until at least two checkpoints are completed, to make sure that
* the recovery does not fall back to "square one" (which would naturally lead to correct
* results without testing the checkpointing).
*/
@@ -59,11 +60,10 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StateCheckpointedITCase.class);
- final long NUM_STRINGS = 10_000_000L;
+ static final long NUM_STRINGS = 10_000_000L;
/**
- * Runs the following program:
- *
+ * Runs the following program.
* <pre>
* [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
@@ -84,7 +84,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
stream
// first vertex, chained to the source
// this filter throttles the flow until at least one checkpoint
- // is complete, to make sure this program does not run without
+ // is complete, to make sure this program does not run without
.filter(new StringRichFilterFunction())
// -------------- seconds vertex - one-to-one connected ----------------
@@ -100,13 +100,13 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
@Override
public void postSubmit() {
-
+
//assertTrue("Test inconclusive: failure occurred before first checkpoint",
// OnceFailingAggregator.wasCheckpointedBeforeFailure);
- if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+ if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
LOG.warn("Test inconclusive: failure occurred before first checkpoint");
}
-
+
long filterSum = 0;
for (long l : StringRichFilterFunction.counts) {
filterSum += l;
@@ -137,10 +137,9 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
-
- private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements ListCheckpointed<Integer>
- {
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements ListCheckpointed<Integer> {
private final long numElements;
private int index;
@@ -157,9 +156,9 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
final Random rnd = new Random();
final StringBuilder stringBuilder = new StringBuilder();
-
+
final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
+
if (index == 0) {
index = getRuntimeContext().getIndexOfThisSubtask();
}
@@ -178,7 +177,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
}
}
-
+
@Override
public void cancel() {
isRunning = false;
@@ -209,11 +208,11 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
}
- private static class StringRichFilterFunction extends RichFilterFunction<String>
+ private static class StringRichFilterFunction extends RichFilterFunction<String>
implements ListCheckpointed<Long> {
- static final long[] counts = new long[PARALLELISM];
-
+ static long[] counts = new long[PARALLELISM];
+
private long count;
@Override
@@ -241,10 +240,10 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
}
- private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
implements ListCheckpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
+
+ static long[] counts = new long[PARALLELISM];
private long count;
@@ -272,12 +271,12 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
this.count = state.get(0);
}
}
-
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
+
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
implements ListCheckpointed<Long> {
- static final long[] counts = new long[PARALLELISM];
-
+ static long[] counts = new long[PARALLELISM];
+
private long count;
@Override
@@ -304,25 +303,25 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
this.count = state.get(0);
}
}
-
- private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
+
+ private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
implements ListCheckpointed<HashMap<String, PrefixCount>>, CheckpointListener {
static boolean wasCheckpointedBeforeFailure = false;
-
+
private static volatile boolean hasFailed = false;
private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
-
+
private long failurePos;
private long count;
-
+
private boolean wasCheckpointed;
OnceFailingAggregator(long failurePos) {
this.failurePos = failurePos;
}
-
+
@Override
public void open(Configuration parameters) {
count = 0;
@@ -336,7 +335,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
hasFailed = true;
throw new Exception("Test Failure");
}
-
+
PrefixCount curr = aggregationMap.get(value.prefix);
if (curr == null) {
aggregationMap.put(value.prefix, value);
@@ -367,12 +366,12 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
}
- private static class ValidatingSink extends RichSinkFunction<PrefixCount>
+ private static class ValidatingSink extends RichSinkFunction<PrefixCount>
implements ListCheckpointed<HashMap<Character, Long>> {
@SuppressWarnings("unchecked")
private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
-
+
private HashMap<Character, Long> counts = new HashMap<Character, Long>();
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index d76d674..16d8b54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,12 +61,10 @@ import static org.junit.Assert.fail;
* checkpoints, that it is called at most once for any checkpoint id and that it is not
* called for a deliberately failed checkpoint.
*
- * <p>
- * The topology tested here includes a number of {@link OneInputStreamOperator}s and a
+ * <p>The topology tested here includes a number of {@link OneInputStreamOperator}s and a
* {@link TwoInputStreamOperator}.
*
- * <p>
- * Note that as a result of doing the checks on the task level there is no way to verify
+ * <p>Note that as a result of doing the checks on the task level there is no way to verify
* that the {@link CheckpointListener#notifyCheckpointComplete(long)} is called for every
* successfully completed checkpoint.
*/
@@ -79,8 +76,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
private static final int PARALLELISM = 4;
/**
- * Runs the following program:
- *
+ * Runs the following program.
* <pre>
* [ (source)->(filter) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
* </pre>
@@ -95,52 +91,52 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
final int numElements = 10000;
- final int numTaskTotal = PARALLELISM * 5;
+ final int numTaskTotal = PARALLELISM * 5;
DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));
stream
// -------------- first vertex, chained to the src ----------------
.filter(new LongRichFilterFunction())
-
+
// -------------- second vertex, applying the co-map ----------------
.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
-
+
// -------------- third vertex - the stateful one that also fails ----------------
.map(new IdentityMapFunction())
.startNewChain()
-
+
// -------------- fourth vertex - reducer and the sink ----------------
.keyBy(0)
.reduce(new OnceFailingReducer(numElements))
-
+
.addSink(new DiscardingSink<Tuple1<Long>>());
-
+
env.execute();
final long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
assertNotEquals(0L, failureCheckpointID);
-
+
List<List<Long>[]> allLists = Arrays.asList(
- GeneratingSourceFunction.completedCheckpoints,
- LongRichFilterFunction.completedCheckpoints,
- LeftIdentityCoRichFlatMapFunction.completedCheckpoints,
- IdentityMapFunction.completedCheckpoints,
- OnceFailingReducer.completedCheckpoints
+ GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
+ LongRichFilterFunction.COMPLETED_CHECKPOINTS,
+ LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
+ IdentityMapFunction.COMPLETED_CHECKPOINTS,
+ OnceFailingReducer.COMPLETED_CHECKPOINTS
);
for (List<Long>[] parallelNotifications : allLists) {
for (List<Long> notifications : parallelNotifications) {
-
- assertTrue("No checkpoint notification was received.",
+
+ assertTrue("No checkpoint notification was received.",
notifications.size() > 0);
-
+
assertFalse("Failure checkpoint was marked as completed.",
notifications.contains(failureCheckpointID));
-
+
assertFalse("No checkpoint received after failure.",
notifications.get(notifications.size() - 1) == failureCheckpointID);
-
+
assertTrue("Checkpoint notification was received multiple times",
notifications.size() == new HashSet<Long>(notifications).size());
}
@@ -160,7 +156,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
}
return lists;
}
-
+
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
@@ -171,21 +167,21 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
*/
private static class GeneratingSourceFunction extends RichSourceFunction<Long>
implements ParallelSourceFunction<Long>, CheckpointListener, ListCheckpointed<Integer> {
-
- static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-
+
+ static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
static AtomicLong numPostFailureNotifications = new AtomicLong();
// operator behaviour
private final long numElements;
-
+
private final int notificationsToWaitFor;
private int index;
private int step;
private volatile boolean notificationAlready;
-
+
private volatile boolean isRunning = true;
GeneratingSourceFunction(long numElements, int notificationsToWaitFor) {
@@ -198,8 +194,9 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
step = getRuntimeContext().getNumberOfParallelSubtasks();
// if index has been restored, it is not 0 any more
- if (index == 0)
+ if (index == 0) {
index = getRuntimeContext().getIndexOfThisSubtask();
+ }
}
@Override
@@ -214,7 +211,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
ctx.collect(result);
}
}
-
+
// if the program goes fast and no notifications come through, we
// wait until all tasks had a chance to see a notification
while (isRunning && numPostFailureNotifications.get() < notificationsToWaitFor) {
@@ -244,7 +241,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
public void notifyCheckpointComplete(long checkpointId) {
// record the ID of the completed checkpoint
int partition = getRuntimeContext().getIndexOfThisSubtask();
- completedCheckpoints[partition].add(checkpointId);
+ COMPLETED_CHECKPOINTS[partition].add(checkpointId);
// if this is the first time we get a notification since the failure,
// tell the source function
@@ -262,7 +259,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
implements CheckpointListener {
- static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
+ static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
private volatile boolean notificationAlready;
@@ -275,7 +272,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
public void notifyCheckpointComplete(long checkpointId) {
// record the ID of the completed checkpoint
int partition = getRuntimeContext().getIndexOfThisSubtask();
- completedCheckpoints[partition].add(checkpointId);
+ COMPLETED_CHECKPOINTS[partition].add(checkpointId);
// if this is the first time we get a notification since the failure,
// tell the source function
@@ -293,10 +290,10 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
*/
private static class LongRichFilterFunction extends RichFilterFunction<Long> implements CheckpointListener {
- static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-
+ static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
private volatile boolean notificationAlready;
-
+
@Override
public boolean filter(Long value) {
return value < 100;
@@ -306,8 +303,8 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
public void notifyCheckpointComplete(long checkpointId) {
// record the ID of the completed checkpoint
int partition = getRuntimeContext().getIndexOfThisSubtask();
- completedCheckpoints[partition].add(checkpointId);
-
+ COMPLETED_CHECKPOINTS[partition].add(checkpointId);
+
// if this is the first time we get a notification since the failure,
// tell the source function
if (OnceFailingReducer.hasFailed && !notificationAlready) {
@@ -325,10 +322,10 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
implements CheckpointListener {
- static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
+ static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
private volatile boolean notificationAlready;
-
+
@Override
public void flatMap1(Long value, Collector<Long> out) {
out.collect(value);
@@ -343,7 +340,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
public void notifyCheckpointComplete(long checkpointId) {
// record the ID of the completed checkpoint
int partition = getRuntimeContext().getIndexOfThisSubtask();
- completedCheckpoints[partition].add(checkpointId);
+ COMPLETED_CHECKPOINTS[partition].add(checkpointId);
// if this is the first time we get a notification since the failure,
// tell the source function
@@ -357,16 +354,15 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
/**
* Reducer that causes one failure between seeing 40% to 70% of the records.
*/
- private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>>
- implements ListCheckpointed<Long>, CheckpointListener
- {
+ private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>>
+ implements ListCheckpointed<Long>, CheckpointListener {
static volatile boolean hasFailed = false;
static volatile long failureCheckpointID;
- static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
-
+ static final List<Long>[] COMPLETED_CHECKPOINTS = createCheckpointLists(PARALLELISM);
+
private final long failurePos;
-
+
private volatile long count;
private volatile boolean notificationAlready;
@@ -381,7 +377,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
if (count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) {
LOG.info(">>>>>>>>>>>>>>>>> Reached failing position <<<<<<<<<<<<<<<<<<<<<");
}
-
+
value1.f0 += value2.f0;
return value1;
}
@@ -409,7 +405,7 @@ public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTes
public void notifyCheckpointComplete(long checkpointId) {
// record the ID of the completed checkpoint
int partition = getRuntimeContext().getIndexOfThisSubtask();
- completedCheckpoints[partition].add(checkpointId);
+ COMPLETED_CHECKPOINTS[partition].add(checkpointId);
// if this is the first time we get a notification since the failure,
// tell the source function
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index aae04c9..616e794 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -41,18 +41,17 @@ import static org.junit.Assert.assertEquals;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after completion, the
+ *
+ * <p>The test triggers a failure after a while and verifies that, after completion, the
* state defined with the {@link ListCheckpointed} interface reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
- final long NUM_STRINGS = 10_000_000L;
+ static final long NUM_STRINGS = 10_000_000L;
/**
- * Runs the following program:
- *
+ * Runs the following program.
* <pre>
* [ (source)->(filter) ]-s->[ (map) ] -> [ (map) ] -> [ (groupBy/count)->(sink) ]
* </pre>
@@ -101,7 +100,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
long reduceInputCount = 0;
- for(long l: OnceFailingPrefixCounter.counts){
+ for (long l: OnceFailingPrefixCounter.counts){
reduceInputCount += l;
}
@@ -118,12 +117,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
// Custom Functions
// --------------------------------------------------------------------------------------------
-
+
private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
implements ParallelSourceFunction<String>, ListCheckpointed<Integer> {
private final long numElements;
-
+
private final Random rnd = new Random();
private final StringBuilder stringBuilder = new StringBuilder();
@@ -132,14 +131,13 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private volatile boolean isRunning = true;
- static final long[] counts = new long[PARALLELISM];
+ static long[] counts = new long[PARALLELISM];
@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
}
-
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@@ -200,11 +198,11 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
this.index = state.get(0);
}
}
-
+
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> {
private long count;
- static final long[] counts = new long[PARALLELISM];
+ static long[] counts = new long[PARALLELISM];
@Override
public PrefixCount map(PrefixCount value) throws Exception {
@@ -234,16 +232,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
/**
* This function uses simultaneously the key/value state and is checkpointed.
*/
- private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount>
+ private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount>
implements ListCheckpointed<Long> {
-
+
private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
- static final long[] counts = new long[PARALLELISM];
+ static long[] counts = new long[PARALLELISM];
private static volatile boolean hasFailed = false;
private final long numElements;
-
+
private long failurePos;
private long count;
@@ -253,7 +251,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
OnceFailingPrefixCounter(long numElements) {
this.numElements = numElements;
}
-
+
@Override
public void open(Configuration parameters) throws IOException {
long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
@@ -261,10 +259,10 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
-
+
pCount = getRuntimeContext().getState(new ValueStateDescriptor<>("pCount", Long.class, 0L));
}
-
+
@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount;
@@ -278,7 +276,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
throw new Exception("Test Failure");
}
inputCount++;
-
+
long currentPrefixCount = pCount.value() + value.count;
pCount.update(currentPrefixCount);
prefixCounts.put(value.prefix, currentPrefixCount);
@@ -301,8 +299,8 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
+
+ static long[] counts = new long[PARALLELISM];
private long count;
@@ -333,8 +331,8 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
implements ListCheckpointed<Long> {
-
- static final long[] counts = new long[PARALLELISM];
+
+ static long[] counts = new long[PARALLELISM];
private long count;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5f56def..5d902ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -38,7 +38,7 @@ import java.io.Serializable;
import static org.junit.Assert.fail;
/**
- * Test base for fault tolerant streaming programs
+ * Test base for fault tolerant streaming programs.
*/
public abstract class StreamFaultToleranceTestBase extends TestLogger {
@@ -55,7 +55,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-
+
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
@@ -82,12 +82,12 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
* Implementations are expected to assemble the test topology in this function
* using the provided {@link StreamExecutionEnvironment}.
*/
- abstract public void testProgram(StreamExecutionEnvironment env);
+ public abstract void testProgram(StreamExecutionEnvironment env);
/**
* Implementations are expected to provide test here to verify the correct behavior.
*/
- abstract public void postSubmit() throws Exception ;
+ public abstract void postSubmit() throws Exception;
/**
* Runs the following program the test program defined in {@link #testProgram(StreamExecutionEnvironment)}
@@ -118,6 +118,9 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
// Frequently used utilities
// --------------------------------------------------------------------------------------------
+ /**
+ * POJO storing prefix, value, and count.
+ */
@SuppressWarnings("serial")
public static class PrefixCount implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 0a89ab9..1cf5829 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.test.checkpointing;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,6 +29,9 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
+/**
+ * Test the {@link TimestampedFileInputSplit} for Continuous File Processing.
+ */
public class TimestampedFileInputSplitTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index a219b68..f19d690 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.checkpointing;
-import com.google.common.collect.EvictingQueue;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -33,6 +32,8 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+
+import com.google.common.collect.EvictingQueue;
import org.junit.Assert;
import java.util.Collections;
@@ -45,15 +46,14 @@ import java.util.Random;
* of {@link AbstractUdfStreamOperator} is correctly restored in case of recovery from
* a failure.
*
- * <p>
- * The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
+ * <p>The topology currently tests the proper behaviour of the {@link StreamGroupedReduce}
* and the {@link StreamGroupedFold} operators.
*/
@SuppressWarnings("serial")
public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTestBase {
- final private static long NUM_INPUT = 500_000L;
- final private static int NUM_OUTPUT = 1_000;
+ private static final long NUM_INPUT = 500_000L;
+ private static final int NUM_OUTPUT = 1_000;
/**
* Assembles a stream of a grouping field and some long data. Applies reduce functions
@@ -66,7 +66,6 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
KeyedStream<Tuple2<Integer, Long>, Tuple> stream = env.addSource(new StatefulMultipleSequence())
.keyBy(0);
-
stream
// testing built-in aggregate
.min(1)
@@ -184,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
* Mapper that causes one failure between seeing 40% to 70% of the records.
*/
private static class OnceFailingIdentityMapFunction
- extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
+ extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
implements ListCheckpointed<Long> {
private static volatile boolean hasFailed = false;
@@ -223,7 +222,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
@Override
public void restoreState(List<Long> state) throws Exception {
- if(!state.isEmpty()) {
+ if (!state.isEmpty()) {
count = state.get(0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 7004f75..7ec4f86 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -27,8 +27,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -52,7 +53,6 @@ import java.util.HashMap;
import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -78,7 +78,6 @@ public class WindowCheckpointingITCase extends TestLogger {
private static TestStreamEnvironment env;
-
@BeforeClass
public static void startTestCluster() {
Configuration config = new Configuration();
@@ -103,9 +102,9 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testTumblingProcessingTimeWindow() {
- final int NUM_ELEMENTS = 3000;
+ final int numElements = 3000;
FailingSource.reset();
-
+
try {
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
@@ -115,7 +114,7 @@ public class WindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
+ .addSource(new FailingSource(numElements, numElements / 3))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(100, MILLISECONDS))
@@ -145,8 +144,7 @@ public class WindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(NUM_ELEMENTS, 1)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -158,7 +156,7 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testSlidingProcessingTimeWindow() {
- final int NUM_ELEMENTS = 3000;
+ final int numElements = 3000;
FailingSource.reset();
try {
@@ -170,7 +168,7 @@ public class WindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
+ .addSource(new FailingSource(numElements, numElements / 3))
.rebalance()
.keyBy(0)
.timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS))
@@ -200,8 +198,7 @@ public class WindowCheckpointingITCase extends TestLogger {
}
}
})
- .addSink(new ValidatingSink(NUM_ELEMENTS, 3)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -213,7 +210,7 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testAggregatingTumblingProcessingTimeWindow() {
- final int NUM_ELEMENTS = 3000;
+ final int numElements = 3000;
FailingSource.reset();
try {
@@ -225,8 +222,8 @@ public class WindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
- .map(new MapFunction<Tuple2<Long,IntType>, Tuple2<Long,IntType>>() {
+ .addSource(new FailingSource(numElements, numElements / 3))
+ .map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
value.f1.value = 1;
@@ -245,8 +242,7 @@ public class WindowCheckpointingITCase extends TestLogger {
return new Tuple2<>(a.f0, new IntType(1));
}
})
- .addSink(new ValidatingSink(NUM_ELEMENTS, 1)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numElements, 1)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -258,7 +254,7 @@ public class WindowCheckpointingITCase extends TestLogger {
@Test
public void testAggregatingSlidingProcessingTimeWindow() {
- final int NUM_ELEMENTS = 3000;
+ final int numElements = 3000;
FailingSource.reset();
try {
@@ -270,8 +266,8 @@ public class WindowCheckpointingITCase extends TestLogger {
env.getConfig().disableSysoutLogging();
env
- .addSource(new FailingSource(NUM_ELEMENTS, NUM_ELEMENTS / 3))
- .map(new MapFunction<Tuple2<Long,IntType>, Tuple2<Long,IntType>>() {
+ .addSource(new FailingSource(numElements, numElements / 3))
+ .map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() {
@Override
public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) {
value.f1.value = 1;
@@ -289,8 +285,7 @@ public class WindowCheckpointingITCase extends TestLogger {
return new Tuple2<>(a.f0, new IntType(1));
}
})
- .addSink(new ValidatingSink(NUM_ELEMENTS, 3)).setParallelism(1);
-
+ .addSink(new ValidatingSink(numElements, 3)).setParallelism(1);
tryExecute(env, "Tumbling Window Test");
}
@@ -305,8 +300,7 @@ public class WindowCheckpointingITCase extends TestLogger {
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener
- {
+ implements ListCheckpointed<Integer>, CheckpointListener {
private static volatile boolean failedBefore = false;
private final int numElementsToEmit;
@@ -470,12 +464,17 @@ public class WindowCheckpointingITCase extends TestLogger {
// Utilities
// ------------------------------------------------------------------------
+ /**
+ * POJO with int value.
+ */
public static class IntType {
public int value;
public IntType() {}
- public IntType(int value) { this.value = value; }
+ public IntType(int value) {
+ this.value = value;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index e4004c7..21be7ba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -18,21 +18,14 @@
package org.apache.flink.test.checkpointing.utils;
-import java.io.File;
-import java.net.URI;
-import java.net.URL;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -42,12 +35,21 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
+
+import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -57,6 +59,9 @@ import scala.concurrent.duration.FiniteDuration;
import static junit.framework.Assert.fail;
+/**
+ * Test savepoint migration.
+ */
public class SavepointMigrationTestBase extends TestBaseUtils {
@Rule
@@ -120,7 +125,6 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
LOG.info("Submitted job {} and waiting...", jobSubmissionResult.getJobID());
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
index 4221670..da6e035 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -133,11 +134,10 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
-
@Test
public void testSavepointRestoreFromFlink11() throws Exception {
- final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+ final int expectedSuccessfulChecks = 21;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -165,13 +165,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
- new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks));
}
@Test
public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
- final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+ final int expectedSuccessfulChecks = 21;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -199,13 +199,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
getResourceFilename("stateful-udf-migration-itcase-flink1.1-rocksdb-savepoint"),
- new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, expectedSuccessfulChecks));
}
private static class LegacyCheckpointedSource
implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
- public static String CHECKPOINTED_STRING = "Here be dragons!";
+ public static String checkpointedString = "Here be dragons!";
private static final long serialVersionUID = 1L;
@@ -237,12 +237,12 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
@Override
public void restoreState(String state) throws Exception {
- assertEquals(CHECKPOINTED_STRING, state);
+ assertEquals(checkpointedString, state);
}
@Override
public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_STRING;
+ return checkpointedString;
}
}
@@ -271,7 +271,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
@Override
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
- assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+ assertEquals(LegacyCheckpointedSource.checkpointedString, restoredState);
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
synchronized (ctx.getCheckpointLock()) {
@@ -296,12 +296,12 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ private static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
implements Checkpointed<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
- public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ public static Tuple2<String, Long> checkpointedTuple =
new Tuple2<>("hello", 42L);
@Override
@@ -315,11 +315,11 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
@Override
public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_TUPLE;
+ return checkpointedTuple;
}
}
- public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ private static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
implements CheckpointedRestoring<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
@@ -337,7 +337,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
out.collect(value);
- assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
}
@@ -348,13 +348,13 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class LegacyCheckpointedFlatMapWithKeyedState
+ private static class LegacyCheckpointedFlatMapWithKeyedState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
implements Checkpointed<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
- public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ public static Tuple2<String, Long> checkpointedTuple =
new Tuple2<>("hello", 42L);
private final ValueStateDescriptor<Long> stateDescriptor =
@@ -373,11 +373,11 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
@Override
public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_TUPLE;
+ return checkpointedTuple;
}
}
- public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ private static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
implements CheckpointedRestoring<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
@@ -404,7 +404,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
assertEquals(value.f1, state.value());
- assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ assertEquals(LegacyCheckpointedFlatMap.checkpointedTuple, restoredState);
getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
}
@@ -414,7 +414,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -429,7 +429,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -457,7 +457,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class CheckpointedUdfOperator
+ private static class CheckpointedUdfOperator
extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -488,7 +488,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
// checkpointId,
// timestamp);
//
-// out.writeUTF(CHECKPOINTED_STRING);
+// out.writeUTF(checkpointedString);
//
// result.setOperatorState(out.closeAndGetHandle());
//
@@ -496,7 +496,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
// }
}
- public static class RestoringCheckingUdfOperator
+ private static class RestoringCheckingUdfOperator
extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -535,7 +535,7 @@ public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigratio
}
}
- public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+ private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
private static final long serialVersionUID = 1L;
private final String accumulatorName;