You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/08/17 07:33:09 UTC

[flink] branch master updated (8d88ff5 -> 741aa43)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8d88ff5  [FLINK-10001][docs] Add documentation for job cluster deployment on Docker and K8s
     new dfbf32b  [hotfix][tests] Deduplicate code in ListCheckpointedTest
     new 99582b9  [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
     new 741aa43  [hotfix][tests] Deduplicate code in AbstractStreamOperatorTestHarness constructor

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/FlinkKafkaProducer011ITCase.java         |  2 -
 .../api/checkpoint/ListCheckpointedTest.java       | 51 +++++++++++---------
 .../sink/TwoPhaseCommitSinkFunctionTest.java       | 15 +++++-
 .../util/AbstractStreamOperatorTestHarness.java    | 16 +++----
 .../AbstractStreamOperatorTestHarnessTest.java     | 55 ++++++++++++++++++++++
 5 files changed, 104 insertions(+), 35 deletions(-)
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java


[flink] 03/03: [hotfix][tests] Deduplicate code in AbstractStreamOperatorTestHarness constructor

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 741aa43e2c72dc93762ad8ab0ee6781bda181f05
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Aug 16 17:49:01 2018 +0200

    [hotfix][tests] Deduplicate code in AbstractStreamOperatorTestHarness constructor
---
 .../streaming/util/AbstractStreamOperatorTestHarness.java   | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 8ee8c03..0e120eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -139,16 +139,9 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 			int subtaskIndex) throws Exception {
 		this(
 			operator,
-			new MockEnvironmentBuilder()
-				.setTaskName("MockTask")
-				.setMemorySize(3 * 1024 * 1024)
-				.setInputSplitProvider(new MockInputSplitProvider())
-				.setBufferSize(1024)
-				.setMaxParallelism(maxParallelism)
-				.setParallelism(parallelism)
-				.setSubtaskIndex(subtaskIndex)
-				.build(),
-			true,
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
 			new OperatorID());
 	}
 


[flink] 01/03: [hotfix][tests] Deduplicate code in ListCheckpointedTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfbf32ba66730388110316fe10e48edf28c83214
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Aug 16 14:20:51 2018 +0200

    [hotfix][tests] Deduplicate code in ListCheckpointedTest
---
 .../api/checkpoint/ListCheckpointedTest.java       | 23 +++++++---------------
 1 file changed, 7 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index ff0b0fc..d6d7591 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -37,31 +37,22 @@ public class ListCheckpointedTest {
 
 	@Test
 	public void testUDFReturningNull() throws Exception {
-		TestUserFunction userFunction = new TestUserFunction(null);
-		AbstractStreamOperatorTestHarness<Integer> testHarness =
-				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
-		testHarness.open();
-		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.initializeState(snapshot);
-		Assert.assertTrue(userFunction.isRestored());
+		testUDF(new TestUserFunction(null));
 	}
 
 	@Test
 	public void testUDFReturningEmpty() throws Exception {
-		TestUserFunction userFunction = new TestUserFunction(Collections.<Integer>emptyList());
-		AbstractStreamOperatorTestHarness<Integer> testHarness =
-				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
-		testHarness.open();
-		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.initializeState(snapshot);
-		Assert.assertTrue(userFunction.isRestored());
+		testUDF(new TestUserFunction(Collections.<Integer>emptyList()));
 	}
 
 	@Test
 	public void testUDFReturningData() throws Exception {
-		TestUserFunction userFunction = new TestUserFunction(Arrays.asList(1, 2, 3));
+		testUDF(new TestUserFunction(Arrays.asList(1, 2, 3)));
+	}
+
+	private static void testUDF(TestUserFunction userFunction) throws Exception {
 		AbstractStreamOperatorTestHarness<Integer> testHarness =
-				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+			new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
 		testHarness.open();
 		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 		testHarness.initializeState(snapshot);


[flink] 02/03: [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 99582b998b4c8251ed0d1469c2d3f361cdd5b8bf
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Aug 16 13:58:42 2018 +0200

    [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized
    
    This is a change in tests only. Previously it was technically possible to call first `harness.open()`
    followed by `harness.initializeState(fooBar)`. However this was incorrect, since `open()` was already
    calling `initializeState(null)`, which was leading to quirks. This commit adds a `checkState` which
    makes sure that `initializeState` is called only once.
---
 .../kafka/FlinkKafkaProducer011ITCase.java         |  2 -
 .../api/checkpoint/ListCheckpointedTest.java       | 30 +++++++++---
 .../sink/TwoPhaseCommitSinkFunctionTest.java       | 15 +++++-
 .../util/AbstractStreamOperatorTestHarness.java    |  3 ++
 .../AbstractStreamOperatorTestHarnessTest.java     | 55 ++++++++++++++++++++++
 5 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad..57b7e77 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		testHarness.setup();
 		testHarness.open();
-		testHarness.initializeState(null);
 		testHarness.processElement(42, 0);
 		testHarness.snapshot(0, 1);
 		testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		testHarness1.setup();
 		testHarness1.open();
-		testHarness1.initializeState(null);
 		testHarness1.processElement(42, 0);
 		testHarness1.snapshot(0, 1);
 		testHarness1.processElement(43, 2);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index d6d7591..644ab04 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for {@link ListCheckpointed}.
  */
@@ -51,12 +54,25 @@ public class ListCheckpointedTest {
 	}
 
 	private static void testUDF(TestUserFunction userFunction) throws Exception {
-		AbstractStreamOperatorTestHarness<Integer> testHarness =
-			new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
-		testHarness.open();
-		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.initializeState(snapshot);
-		Assert.assertTrue(userFunction.isRestored());
+		OperatorSubtaskState snapshot;
+		try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+			testHarness.open();
+			snapshot = testHarness.snapshot(0L, 0L);
+			assertFalse(userFunction.isRestored());
+		}
+		try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) {
+			testHarness.initializeState(snapshot);
+			testHarness.open();
+			assertTrue(userFunction.isRestored());
+		}
+	}
+
+	private static AbstractStreamOperatorTestHarness<Integer> createTestHarness(TestUserFunction userFunction) throws Exception {
+		return new AbstractStreamOperatorTestHarness<>(
+			new StreamMap<>(userFunction),
+			1,
+			1,
+			0);
 	}
 
 	private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> {
@@ -86,7 +102,7 @@ public class ListCheckpointedTest {
 			if (null != expected) {
 				Assert.assertEquals(expected, state);
 			} else {
-				Assert.assertTrue(state.isEmpty());
+				assertTrue(state.isEmpty());
 			}
 			restored = true;
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 166dc5a..2970b87 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -195,10 +195,14 @@ public class TwoPhaseCommitSinkFunctionTest {
 		final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
 		harness.notifyOfCompletedCheckpoint(1);
 
+		throwException.set(true);
+
+		closeTestHarness();
+		setUpTestHarness();
+
 		final long transactionTimeout = 1000;
 		sinkFunction.setTransactionTimeout(transactionTimeout);
 		sinkFunction.ignoreFailuresAfterTransactionTimeout();
-		throwException.set(true);
 
 		try {
 			harness.initializeState(snapshot);
@@ -251,11 +255,20 @@ public class TwoPhaseCommitSinkFunctionTest {
 		final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
 		final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2);
 		clock.setEpochMilli(elapsedTime);
+
+		closeTestHarness();
+		setUpTestHarness();
+		sinkFunction.setTransactionTimeout(transactionTimeout);
+		sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
+
 		harness.initializeState(snapshot);
+		harness.open();
 
 		final List<String> logMessages =
 			loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
 
+		closeTestHarness();
+
 		assertThat(
 			logMessages,
 			hasItem(containsString("has been open for 502 ms. " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2f19ce2..8ee8c03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -80,6 +80,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -359,6 +360,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		OperatorSubtaskState jmOperatorStateHandles,
 		OperatorSubtaskState tmOperatorStateHandles) throws Exception {
 
+		checkState(!initializeCalled, "TestHarness has already been initialized. Have you " +
+			"opened this harness before initializing it?");
 		if (!setupCalled) {
 			setup();
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
new file mode 100644
index 0000000..65f62a3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+/**
+ * Tests for {@link AbstractStreamOperatorTestHarness}.
+ */
+public class AbstractStreamOperatorTestHarnessTest extends TestLogger {
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testInitializeAfterOpenning() throws Throwable {
+		expectedException.expect(IllegalStateException.class);
+		expectedException.expectMessage(containsString("TestHarness has already been initialized."));
+
+		AbstractStreamOperatorTestHarness<Integer> result;
+		result =
+			new AbstractStreamOperatorTestHarness<>(
+				new AbstractStreamOperator<Integer>() {
+				},
+				1,
+				1,
+				0);
+		result.setup();
+		result.open();
+		result.initializeState(new OperatorSubtaskState());
+	}
+}