You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/08/17 07:33:11 UTC

[flink] 02/03: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 99582b998b4c8251ed0d1469c2d3f361cdd5b8bf
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Aug 16 13:58:42 2018 +0200

    [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
    
    This is a change in tests only. Previously it was technically possible to call first `harness.open()`
    followed by `harness.initializeState(fooBar)`. However this was incorrect, since `open()` was already
    calling `initializeState(null)`, which was leading to quirks. This commit adds a `checkState` which
    makes sure that `initializeState` is called only once.
---
 .../kafka/FlinkKafkaProducer011ITCase.java         |  2 -
 .../api/checkpoint/ListCheckpointedTest.java       | 30 +++++++++---
 .../sink/TwoPhaseCommitSinkFunctionTest.java       | 15 +++++-
 .../util/AbstractStreamOperatorTestHarness.java    |  3 ++
 .../AbstractStreamOperatorTestHarnessTest.java     | 55 ++++++++++++++++++++++
 5 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad..57b7e77 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		testHarness.setup();
 		testHarness.open();
-		testHarness.initializeState(null);
 		testHarness.processElement(42, 0);
 		testHarness.snapshot(0, 1);
 		testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		testHarness1.setup();
 		testHarness1.open();
-		testHarness1.initializeState(null);
 		testHarness1.processElement(42, 0);
 		testHarness1.snapshot(0, 1);
 		testHarness1.processElement(43, 2);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index d6d7591..644ab04 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for {@link ListCheckpointed}.
  */
@@ -51,12 +54,25 @@ public class ListCheckpointedTest {
 	}
 
 	private static void testUDF(TestUserFunction userFunction) throws Exception {
-		AbstractStreamOperatorTestHarness<Integer> testHarness =
-			new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
-		testHarness.open();
-		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.initializeState(snapshot);
-		Assert.assertTrue(userFunction.isRestored());
+		OperatorSubtaskState snapshot;
+		try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+			testHarness.open();
+			snapshot = testHarness.snapshot(0L, 0L);
+			assertFalse(userFunction.isRestored());
+		}
+		try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+			testHarness.initializeState(snapshot);
+			testHarness.open();
+			assertTrue(userFunction.isRestored());
+		}
+	}
+
+	private static AbstractStreamOperatorTestHarness<Integer> createTestHarness(TestUserFunction userFunction) throws Exception {
+		return new AbstractStreamOperatorTestHarness<>(
+			new StreamMap<>(userFunction),
+			1,
+			1,
+			0);
 	}
 
 	private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> {
@@ -86,7 +102,7 @@ public class ListCheckpointedTest {
 			if (null != expected) {
 				Assert.assertEquals(expected, state);
 			} else {
-				Assert.assertTrue(state.isEmpty());
+				assertTrue(state.isEmpty());
 			}
 			restored = true;
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 166dc5a..2970b87 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -195,10 +195,14 @@ public class TwoPhaseCommitSinkFunctionTest {
 		final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
 		harness.notifyOfCompletedCheckpoint(1);
 
+		throwException.set(true);
+
+		closeTestHarness();
+		setUpTestHarness();
+
 		final long transactionTimeout = 1000;
 		sinkFunction.setTransactionTimeout(transactionTimeout);
 		sinkFunction.ignoreFailuresAfterTransactionTimeout();
-		throwException.set(true);
 
 		try {
 			harness.initializeState(snapshot);
@@ -251,11 +255,20 @@ public class TwoPhaseCommitSinkFunctionTest {
 		final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
 		final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2);
 		clock.setEpochMilli(elapsedTime);
+
+		closeTestHarness();
+		setUpTestHarness();
+		sinkFunction.setTransactionTimeout(transactionTimeout);
+		sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
+
 		harness.initializeState(snapshot);
+		harness.open();
 
 		final List<String> logMessages =
 			loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
 
+		closeTestHarness();
+
 		assertThat(
 			logMessages,
 			hasItem(containsString("has been open for 502 ms. " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2f19ce2..8ee8c03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -80,6 +80,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -359,6 +360,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		OperatorSubtaskState jmOperatorStateHandles,
 		OperatorSubtaskState tmOperatorStateHandles) throws Exception {
 
+		checkState(!initializeCalled, "TestHarness has already been initialized. Have you " +
+			"opened this harness before initializing it?");
 		if (!setupCalled) {
 			setup();
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
new file mode 100644
index 0000000..65f62a3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+/**
+ * Tests for {@link AbstractStreamOperatorTestHarness}.
+ */
+public class AbstractStreamOperatorTestHarnessTest extends TestLogger {
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testInitializeAfterOpenning() throws Throwable {
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage(containsString("TestHarness has already been initialized."));
+
+		AbstractStreamOperatorTestHarness<Integer> result;
+		result =
+			new AbstractStreamOperatorTestHarness<>(
+				new AbstractStreamOperator<Integer>() {
+				},
+				1,
+				1,
+				0);
+		result.setup();
+		result.open();
+		result.initializeState(new OperatorSubtaskState());
+	}
+}