You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/08/17 07:33:11 UTC
[flink] 02/03: [FLINK-10159][tests] Fail
TestHarness.initializeState if harness has already been initialized
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 99582b998b4c8251ed0d1469c2d3f361cdd5b8bf
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Aug 16 13:58:42 2018 +0200
[FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
This is a change in tests only. Previously it was technically possible to call first `harness.open()`
followed by `harness.initializeState(fooBar)`. However this was incorrect, since `open()` was already
calling `initializeState(null)`, which was leading to quirks. This commit adds a `checkState` which
makes sure that `initializeState` is called only once.
---
.../kafka/FlinkKafkaProducer011ITCase.java | 2 -
.../api/checkpoint/ListCheckpointedTest.java | 30 +++++++++---
.../sink/TwoPhaseCommitSinkFunctionTest.java | 15 +++++-
.../util/AbstractStreamOperatorTestHarness.java | 3 ++
.../AbstractStreamOperatorTestHarnessTest.java | 55 ++++++++++++++++++++++
5 files changed, 95 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad..57b7e77 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
testHarness.setup();
testHarness.open();
- testHarness.initializeState(null);
testHarness.processElement(42, 0);
testHarness.snapshot(0, 1);
testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
testHarness1.setup();
testHarness1.open();
- testHarness1.initializeState(null);
testHarness1.processElement(42, 0);
testHarness1.snapshot(0, 1);
testHarness1.processElement(43, 2);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index d6d7591..644ab04 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests for {@link ListCheckpointed}.
*/
@@ -51,12 +54,25 @@ public class ListCheckpointedTest {
}
private static void testUDF(TestUserFunction userFunction) throws Exception {
- AbstractStreamOperatorTestHarness<Integer> testHarness =
- new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
- testHarness.open();
- OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
- testHarness.initializeState(snapshot);
- Assert.assertTrue(userFunction.isRestored());
+ OperatorSubtaskState snapshot;
+ try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+ testHarness.open();
+ snapshot = testHarness.snapshot(0L, 0L);
+ assertFalse(userFunction.isRestored());
+ }
+ try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+ assertTrue(userFunction.isRestored());
+ }
+ }
+
+ private static AbstractStreamOperatorTestHarness<Integer> createTestHarness(TestUserFunction userFunction) throws Exception {
+ return new AbstractStreamOperatorTestHarness<>(
+ new StreamMap<>(userFunction),
+ 1,
+ 1,
+ 0);
}
private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> {
@@ -86,7 +102,7 @@ public class ListCheckpointedTest {
if (null != expected) {
Assert.assertEquals(expected, state);
} else {
- Assert.assertTrue(state.isEmpty());
+ assertTrue(state.isEmpty());
}
restored = true;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 166dc5a..2970b87 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -195,10 +195,14 @@ public class TwoPhaseCommitSinkFunctionTest {
final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
harness.notifyOfCompletedCheckpoint(1);
+ throwException.set(true);
+
+ closeTestHarness();
+ setUpTestHarness();
+
final long transactionTimeout = 1000;
sinkFunction.setTransactionTimeout(transactionTimeout);
sinkFunction.ignoreFailuresAfterTransactionTimeout();
- throwException.set(true);
try {
harness.initializeState(snapshot);
@@ -251,11 +255,20 @@ public class TwoPhaseCommitSinkFunctionTest {
final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2);
clock.setEpochMilli(elapsedTime);
+
+ closeTestHarness();
+ setUpTestHarness();
+ sinkFunction.setTransactionTimeout(transactionTimeout);
+ sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
+
harness.initializeState(snapshot);
+ harness.open();
final List<String> logMessages =
loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
+ closeTestHarness();
+
assertThat(
logMessages,
hasItem(containsString("has been open for 502 ms. " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2f19ce2..8ee8c03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -80,6 +80,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
+import static org.apache.flink.util.Preconditions.checkState;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -359,6 +360,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
OperatorSubtaskState jmOperatorStateHandles,
OperatorSubtaskState tmOperatorStateHandles) throws Exception {
+ checkState(!initializeCalled, "TestHarness has already been initialized. Have you " +
+ "opened this harness before initializing it?");
if (!setupCalled) {
setup();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
new file mode 100644
index 0000000..65f62a3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+/**
+ * Tests for {@link AbstractStreamOperatorTestHarness}.
+ */
+public class AbstractStreamOperatorTestHarnessTest extends TestLogger {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testInitializeAfterOpenning() throws Throwable {
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(containsString("TestHarness has already been initialized."));
+
+ AbstractStreamOperatorTestHarness<Integer> result;
+ result =
+ new AbstractStreamOperatorTestHarness<>(
+ new AbstractStreamOperator<Integer>() {
+ },
+ 1,
+ 1,
+ 0);
+ result.setup();
+ result.open();
+ result.initializeState(new OperatorSubtaskState());
+ }
+}