You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/04 11:30:18 UTC

[flink] branch master updated (61119b8 -> 83a204e)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 61119b8  [FLINK-13073][table-blink-runtime] BinaryRow in Blink runtime has wrong FIRST_BYTE_ZERO mask
     new 42d671c  [hotfix][test] Drop InputGateConcurrentTest
     new de91684  [hotfix][test] Drop unused closedChannels field in MockInputGate
     new 76eb11f  [FLINK-13016][network] Fix StreamTaskNetworkInput#isAvailable
     new 6caa61b  [hotfix][operator] Deduplicate StreamTask code
     new de44130  [hotfix][operator] Rename StreamInputProcessor to StreamOneInputProcessor
     new 83a204e  [hotfix][operator] Deduplicate code by introducing StreamInputProcessor interface

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/buffer/BufferBuilderTestUtils.java  |   4 +
 .../network/partition/InputGateConcurrentTest.java | 278 ---------------------
 .../streaming/runtime/io/InputProcessorUtil.java   |   2 +-
 .../streaming/runtime/io/StreamInputProcessor.java | 210 +---------------
 ...Processor.java => StreamOneInputProcessor.java} |  30 +--
 .../runtime/io/StreamTaskNetworkInput.java         |   3 +
 .../runtime/io/StreamTwoInputProcessor.java        |   6 +-
 .../io/StreamTwoInputSelectableProcessor.java      |  12 +-
 .../runtime/tasks/OneInputStreamTask.java          |  24 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  22 +-
 .../tasks/TwoInputSelectableStreamTask.java        |  21 --
 .../runtime/tasks/TwoInputStreamTask.java          |  21 --
 .../flink/streaming/runtime/io/MockInputGate.java  |  18 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     | 101 ++++++++
 .../tasks/StreamTaskCancellationBarrierTest.java   |  11 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   4 -
 .../streaming/runtime/tasks/StreamTaskTest.java    |  25 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   4 -
 .../runtime/tasks/SynchronousCheckpointTest.java   |  14 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   3 -
 .../flink/streaming/util/MockStreamTask.java       |   3 -
 .../jobmaster/JobMasterStopWithSavepointIT.java    |  32 +--
 22 files changed, 187 insertions(+), 661 deletions(-)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/{StreamInputProcessor.java => StreamOneInputProcessor.java} (97%)
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java


[flink] 01/06: [hotfix][test] Drop InputGateConcurrentTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42d671c11153e67588f2c80d4d2953c53b6b067d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 1 14:44:38 2019 +0200

    [hotfix][test] Drop InputGateConcurrentTest
    
    Those tests are mostly superseded by StreamNetworkThroughputBenchmarkTest
    (except of mixed local/remote workload)
---
 .../network/partition/InputGateConcurrentTest.java | 278 ---------------------
 1 file changed, 278 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
deleted file mode 100644
index c7c93ef..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
-/**
- * Concurrency tests for input gates.
- */
-public class InputGateConcurrentTest {
-
-	@Test
-	public void testConsumptionWithLocalChannels() throws Exception {
-		final int numberOfChannels = 11;
-		final int buffersPerChannel = 1000;
-
-		final ResultPartition resultPartition = mock(ResultPartition.class);
-
-		final PipelinedSubpartition[] partitions = new PipelinedSubpartition[numberOfChannels];
-		final Source[] sources = new Source[numberOfChannels];
-
-		final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0; i < numberOfChannels; i++) {
-			createLocalInputChannel(gate, i, resultPartitionManager);
-			partitions[i] = new PipelinedSubpartition(0, resultPartition);
-			sources[i] = new PipelinedSubpartitionSource(partitions[i]);
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	@Test
-	public void testConsumptionWithRemoteChannels() throws Exception {
-		final int numberOfChannels = 11;
-		final int buffersPerChannel = 1000;
-
-		final ConnectionManager connManager = createDummyConnectionManager();
-		final Source[] sources = new Source[numberOfChannels];
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0; i < numberOfChannels; i++) {
-			RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-			sources[i] = new RemoteChannelSource(channel);
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	@Test
-	public void testConsumptionWithMixedChannels() throws Exception {
-		final int numberOfChannels = 61;
-		final int numLocalChannels = 20;
-		final int buffersPerChannel = 1000;
-
-		// fill the local/remote decision
-		List<Boolean> localOrRemote = new ArrayList<>(numberOfChannels);
-		for (int i = 0; i < numberOfChannels; i++) {
-			localOrRemote.add(i < numLocalChannels);
-		}
-		Collections.shuffle(localOrRemote);
-
-		final ConnectionManager connManager = createDummyConnectionManager();
-		final ResultPartition resultPartition = mock(ResultPartition.class);
-
-		final PipelinedSubpartition[] localPartitions = new PipelinedSubpartition[numLocalChannels];
-		final ResultPartitionManager resultPartitionManager = createResultPartitionManager(localPartitions);
-
-		final Source[] sources = new Source[numberOfChannels];
-
-		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
-
-		for (int i = 0, local = 0; i < numberOfChannels; i++) {
-			if (localOrRemote.get(i)) {
-				// local channel
-				PipelinedSubpartition psp = new PipelinedSubpartition(0, resultPartition);
-				localPartitions[local++] = psp;
-				sources[i] = new PipelinedSubpartitionSource(psp);
-
-				createLocalInputChannel(gate, i, resultPartitionManager);
-			}
-			else {
-				//remote channel
-				RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-				sources[i] = new RemoteChannelSource(channel);
-			}
-		}
-
-		ProducerThread producer = new ProducerThread(sources, numberOfChannels * buffersPerChannel, 4, 10);
-		ConsumerThread consumer = new ConsumerThread(gate, numberOfChannels * buffersPerChannel);
-		producer.start();
-		consumer.start();
-
-		// the 'sync()' call checks for exceptions and failed assertions
-		producer.sync();
-		consumer.sync();
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing threads
-	// ------------------------------------------------------------------------
-
-	private abstract static class Source {
-
-		abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;
-
-		abstract void flush();
-	}
-
-	private static class PipelinedSubpartitionSource extends Source {
-
-		final PipelinedSubpartition partition;
-
-		PipelinedSubpartitionSource(PipelinedSubpartition partition) {
-			this.partition = partition;
-		}
-
-		@Override
-		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			partition.add(bufferConsumer);
-		}
-
-		@Override
-		void flush() {
-			partition.flush();
-		}
-	}
-
-	private static class RemoteChannelSource extends Source {
-
-		final RemoteInputChannel channel;
-		private int seq = 0;
-
-		RemoteChannelSource(RemoteInputChannel channel) {
-			this.channel = channel;
-		}
-
-		@Override
-		void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
-			try {
-				Buffer buffer = bufferConsumer.build();
-				checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
-				channel.onBuffer(buffer, seq++, -1);
-			}
-			finally {
-				bufferConsumer.close();
-			}
-		}
-
-		@Override
-		void flush() {
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  testing threads
-	// ------------------------------------------------------------------------
-
-	private static class ProducerThread extends CheckedThread {
-
-		private final Random rnd = new Random();
-		private final Source[] sources;
-		private final int numTotal;
-		private final int maxChunk;
-		private final int yieldAfter;
-
-		ProducerThread(Source[] sources, int numTotal, int maxChunk, int yieldAfter) {
-			super("producer");
-			this.sources = sources;
-			this.numTotal = numTotal;
-			this.maxChunk = maxChunk;
-			this.yieldAfter = yieldAfter;
-		}
-
-		@Override
-		public void go() throws Exception {
-			final BufferConsumer bufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(100);
-			int nextYield = numTotal - yieldAfter;
-
-			for (int i = numTotal; i > 0;) {
-				final int nextChannel = rnd.nextInt(sources.length);
-				final int chunk = Math.min(i, rnd.nextInt(maxChunk) + 1);
-
-				final Source next = sources[nextChannel];
-
-				for (int k = chunk; k > 0; --k) {
-					next.addBufferConsumer(bufferConsumer.copy());
-				}
-
-				i -= chunk;
-
-				if (i <= nextYield) {
-					nextYield -= yieldAfter;
-					//noinspection CallToThreadYield
-					Thread.yield();
-				}
-			}
-
-			for (Source source : sources) {
-				source.flush();
-			}
-		}
-	}
-
-	private static class ConsumerThread extends CheckedThread {
-
-		private final SingleInputGate gate;
-		private final int numBuffers;
-
-		ConsumerThread(SingleInputGate gate, int numBuffers) {
-			super("consumer");
-			this.gate = gate;
-			this.numBuffers = numBuffers;
-		}
-
-		@Override
-		public void go() throws Exception {
-			for (int i = numBuffers; i > 0; --i) {
-				assertNotNull(gate.getNext());
-			}
-		}
-	}
-}


[flink] 02/06: [hotfix][test] Drop unused closedChannels field in MockInputGate

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de9168423ecc322e3d52d3ce584757612fef9693
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 27 12:28:47 2019 +0200

    [hotfix][test] Drop unused closedChannels field in MockInputGate
---
 .../test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 5f95e17..65984f4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -39,8 +39,6 @@ public class MockInputGate extends InputGate {
 
 	private final boolean[] closed;
 
-	private int closedChannels;
-
 	MockInputGate(int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
 		this.numberOfChannels = numberOfChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
@@ -77,7 +75,6 @@ public class MockInputGate extends InputGate {
 		}
 		if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
 			closed[channelIdx] = true;
-			closedChannels++;
 		}
 		return Optional.of(next);
 	}


[flink] 06/06: [hotfix][operator] Deduplicate code by introducing StreamInputProcessor interface

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83a204e6823a88ce8bb8d59217c7b24d01603862
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jun 21 13:24:55 2019 +0200

    [hotfix][operator] Deduplicate code by introducing StreamInputProcessor interface
---
 .../streaming/runtime/io/StreamInputProcessor.java | 34 ++++++++++++++++++++++
 .../runtime/io/StreamOneInputProcessor.java        | 26 +++++++++--------
 .../runtime/io/StreamTwoInputProcessor.java        |  6 ++--
 .../io/StreamTwoInputSelectableProcessor.java      | 12 ++++----
 .../runtime/tasks/OneInputStreamTask.java          | 16 ----------
 .../flink/streaming/runtime/tasks/StreamTask.java  | 21 +++++++++++--
 .../tasks/TwoInputSelectableStreamTask.java        | 16 ----------
 .../runtime/tasks/TwoInputStreamTask.java          | 16 ----------
 8 files changed, 77 insertions(+), 70 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
new file mode 100644
index 0000000..0b263d0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Closeable;
+
+/**
+ * Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+@Internal
+public interface StreamInputProcessor extends Closeable {
+	/**
+	 * @return true if {@link StreamTaskInput} is finished.
+	 */
+	boolean processInput() throws Exception;
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
index b71aaa3..4be6d54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
@@ -62,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * @param <IN> The type of the record that can be read with this record reader.
  */
 @Internal
-public class StreamOneInputProcessor<IN> {
+public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamOneInputProcessor.class);
 
@@ -128,6 +128,7 @@ public class StreamOneInputProcessor<IN> {
 		this.operatorChain = checkNotNull(operatorChain);
 	}
 
+	@Override
 	public boolean processInput() throws Exception {
 		initializeNumRecordsIn();
 
@@ -143,6 +144,16 @@ public class StreamOneInputProcessor<IN> {
 		return true;
 	}
 
+	private boolean checkFinished() throws Exception {
+		boolean isFinished = input.isFinished();
+		if (isFinished) {
+			synchronized (lock) {
+				operatorChain.endInput(1);
+			}
+		}
+		return isFinished;
+	}
+
 	private void processElement(StreamElement recordOrMark, int channel) throws Exception {
 		if (recordOrMark.isRecord()) {
 			// now we can do the actual processing
@@ -180,17 +191,8 @@ public class StreamOneInputProcessor<IN> {
 		}
 	}
 
-	private boolean checkFinished() throws Exception {
-		boolean isFinished = input.isFinished();
-		if (isFinished) {
-			synchronized (lock) {
-				operatorChain.endInput(1);
-			}
-		}
-		return isFinished;
-	}
-
-	public void cleanup() throws Exception {
+	@Override
+	public void close() throws IOException {
 		input.close();
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index f989e57..00bdab9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -75,7 +75,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <IN2> The type of the records that arrive on the second input
  */
 @Internal
-public class StreamTwoInputProcessor<IN1, IN2> {
+public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
 
@@ -206,6 +206,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		this.finishedChannels2 = new BitSet();
 	}
 
+	@Override
 	public boolean processInput() throws Exception {
 		if (isFinished) {
 			return false;
@@ -347,7 +348,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		}
 	}
 
-	public void cleanup() throws IOException {
+	@Override
+	public void close() throws IOException {
 		// clear the buffers first. this part should not ever fail
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index d5172ac..65464db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -60,7 +60,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * @param <IN2> The type of the records that arrive on the second input
  */
 @Internal
-public class StreamTwoInputSelectableProcessor<IN1, IN2> {
+public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements StreamInputProcessor {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
 
@@ -159,6 +159,7 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 
 	}
 
+	@Override
 	public boolean processInput() throws Exception {
 		if (!isPrepared) {
 			// the preparations here are not placed in the constructor because all work in it
@@ -192,17 +193,18 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 		return !checkFinished();
 	}
 
-	public void cleanup() throws Exception {
-		Exception ex = null;
+	@Override
+	public void close() throws IOException {
+		IOException ex = null;
 		try {
 			input1.close();
-		} catch (Exception e) {
+		} catch (IOException e) {
 			ex = ExceptionUtils.firstOrSuppressed(e, ex);
 		}
 
 		try {
 			input2.close();
-		} catch (Exception e) {
+		} catch (IOException e) {
 			ex = ExceptionUtils.firstOrSuppressed(e, ex);
 		}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index c8c51bb..82c35e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -37,8 +37,6 @@ import javax.annotation.Nullable;
 @Internal
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
 
-	private StreamOneInputProcessor<IN> inputProcessor;
-
 	private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
 	/**
@@ -96,18 +94,4 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		// wrap watermark gauge since registered metrics must be unique
 		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
 	}
-
-	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
-		if (!inputProcessor.processInput()) {
-			context.allActionsCompleted();
-		}
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		if (inputProcessor != null) {
-			inputProcessor.cleanup();
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d333b8d..8c72277 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -142,6 +143,12 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 */
 	private final Object lock = new Object();
 
+	/**
+	 * The input processor. Initialized in {@link #init()} method.
+	 */
+	@Nullable
+	protected StreamInputProcessor inputProcessor;
+
 	/** the head operator that consumes the input streams of this task. */
 	protected OP headOperator;
 
@@ -232,11 +239,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected abstract void init() throws Exception;
 
-	protected abstract void cleanup() throws Exception;
-
 	protected void cancelTask() throws Exception {
 	}
 
+	protected void cleanup() throws Exception {
+		if (inputProcessor != null) {
+			inputProcessor.close();
+		}
+	}
+
 	/**
 	 * This method implements the default action of the task (e.g. processing one event from the input). Implementations
 	 * should (in general) be non-blocking.
@@ -244,7 +255,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 * @param context context object for collaborative interaction between the action and the stream task.
 	 * @throws Exception on any problems in the action.
 	 */
-	protected abstract void performDefaultAction(ActionContext context) throws Exception;
+	protected void performDefaultAction(ActionContext context) throws Exception {
+		if (!inputProcessor.processInput()) {
+			context.allActionsCompleted();
+		}
+	}
 
 	/**
 	 * Runs the stream-tasks main processing loop.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index 6c11abf..aab761d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -34,8 +34,6 @@ import java.util.Collection;
 @Internal
 public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> {
 
-	private StreamTwoInputSelectableProcessor<IN1, IN2> inputProcessor;
-
 	public TwoInputSelectableStreamTask(Environment env) {
 		super(env);
 	}
@@ -62,18 +60,4 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
 			getTaskNameWithSubtaskAndId(),
 			operatorChain);
 	}
-
-	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
-		if (!inputProcessor.processInput()) {
-			context.allActionsCompleted();
-		}
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		if (inputProcessor != null) {
-			inputProcessor.cleanup();
-		}
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 569cf16..307f303 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -33,8 +33,6 @@ import java.util.Collection;
 @Internal
 public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> {
 
-	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
-
 	public TwoInputStreamTask(Environment env) {
 		super(env);
 	}
@@ -62,18 +60,4 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
 			getTaskNameWithSubtaskAndId(),
 			operatorChain);
 	}
-
-	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
-		if (!inputProcessor.processInput()) {
-			context.allActionsCompleted();
-		}
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		if (inputProcessor != null) {
-			inputProcessor.cleanup();
-		}
-	}
 }


[flink] 05/06: [hotfix][operator] Rename StreamInputProcessor to StreamOneInputProcessor

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de44130b7d275f6e8860c8866fb6cd56caf05175
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jun 25 15:05:14 2019 +0200

    [hotfix][operator] Rename StreamInputProcessor to StreamOneInputProcessor
    
    This is done in order to free `StreamInputProcessor` name as a base interface of
    all of the processors.
---
 .../org/apache/flink/streaming/runtime/io/InputProcessorUtil.java   | 2 +-
 .../io/{StreamInputProcessor.java => StreamOneInputProcessor.java}  | 6 +++---
 .../apache/flink/streaming/runtime/tasks/OneInputStreamTask.java    | 6 +++---
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 4f2a07c..80c0396 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
- * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
+ * for {@link StreamOneInputProcessor} and {@link StreamTwoInputProcessor}.
  */
 @Internal
 public class InputProcessorUtil {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
similarity index 98%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
index d6fcad2..b71aaa3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
@@ -62,9 +62,9 @@ import static org.apache.flink.util.Preconditions.checkState;
  * @param <IN> The type of the record that can be read with this record reader.
  */
 @Internal
-public class StreamInputProcessor<IN> {
+public class StreamOneInputProcessor<IN> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamOneInputProcessor.class);
 
 	private final StreamTaskInput input;
 
@@ -87,7 +87,7 @@ public class StreamInputProcessor<IN> {
 	private Counter numRecordsIn;
 
 	@SuppressWarnings("unchecked")
-	public StreamInputProcessor(
+	public StreamOneInputProcessor(
 			InputGate[] inputGates,
 			TypeSerializer<IN> inputSerializer,
 			StreamTask<?, ?> checkpointedTask,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index e0b328c..c8c51bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import javax.annotation.Nullable;
@@ -37,7 +37,7 @@ import javax.annotation.Nullable;
 @Internal
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
 
-	private StreamInputProcessor<IN> inputProcessor;
+	private StreamOneInputProcessor<IN> inputProcessor;
 
 	private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
 
@@ -77,7 +77,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 
-			inputProcessor = new StreamInputProcessor<>(
+			inputProcessor = new StreamOneInputProcessor<>(
 					inputGates,
 					inSerializer,
 					this,


[flink] 04/06: [hotfix][operator] Deduplicate StreamTask code

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6caa61b13e9a6bfeaa27e7be2c97ff099730a5e3
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jun 21 13:10:04 2019 +0200

    [hotfix][operator] Deduplicate StreamTask code
---
 .../runtime/tasks/OneInputStreamTask.java          |  4 ---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  3 +-
 .../tasks/TwoInputSelectableStreamTask.java        |  5 ----
 .../runtime/tasks/TwoInputStreamTask.java          |  5 ----
 .../tasks/StreamTaskCancellationBarrierTest.java   | 11 ++------
 .../runtime/tasks/StreamTaskTerminationTest.java   |  4 ---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 25 ++---------------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  4 ---
 .../runtime/tasks/SynchronousCheckpointTest.java   | 14 ++--------
 .../tasks/TaskCheckpointingBehaviourTest.java      |  3 --
 .../flink/streaming/util/MockStreamTask.java       |  3 --
 .../jobmaster/JobMasterStopWithSavepointIT.java    | 32 +---------------------
 12 files changed, 11 insertions(+), 102 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index f787571..e0b328c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -110,8 +110,4 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e5f56b2..d333b8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -234,7 +234,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected abstract void cleanup() throws Exception;
 
-	protected abstract void cancelTask() throws Exception;
+	protected void cancelTask() throws Exception {
+	}
 
 	/**
 	 * This method implements the default action of the task (e.g. processing one event from the input). Implementations
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
index cde5a5a..6c11abf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends AbstractTwoInpu
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-
-	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 4670c0b..569cf16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -76,9 +76,4 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas
 			inputProcessor.cleanup();
 		}
 	}
-
-	@Override
-	protected void cancelTask() {
-
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 456aea5..21427e0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
 import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 
 import org.junit.Test;
 
@@ -173,7 +174,7 @@ public class StreamTaskCancellationBarrierTest {
 	//  test tasks / functions
 	// ------------------------------------------------------------------------
 
-	private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+	private static class InitBlockingTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
 
 		private final Object lock = new Object();
 		private volatile boolean running = true;
@@ -192,14 +193,6 @@ public class StreamTaskCancellationBarrierTest {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() throws Exception {}
-
-		@Override
 		protected void cancelTask() throws Exception {
 			running = false;
 			synchronized (lock) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index db3d657..72e8a19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -241,10 +241,6 @@ public class StreamTaskTerminationTest extends TestLogger {
 			// wait until all async checkpoint threads are terminated, so that no more exceptions can be reported
 			Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
 		}
-
-		@Override
-		protected void cancelTask() {
-		}
 	}
 
 	private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 46b5389..4ed23c7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -805,9 +805,6 @@ public class StreamTaskTest extends TestLogger {
 
 		@Override
 		protected void cleanup() throws Exception {}
-
-		@Override
-		protected void cancelTask() throws Exception {}
 	}
 
 	private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
@@ -1007,9 +1004,6 @@ public class StreamTaskTest extends TestLogger {
 		@Override
 		protected void cleanup() throws Exception {}
 
-		@Override
-		protected void cancelTask() throws Exception {}
-
 		void finishInput() {
 			this.inputFinished = true;
 		}
@@ -1045,9 +1039,6 @@ public class StreamTaskTest extends TestLogger {
 		protected void cleanup() throws Exception {}
 
 		@Override
-		protected void cancelTask() throws Exception {}
-
-		@Override
 		public StreamTaskStateInitializer createStreamTaskStateInitializer() {
 			final StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
 			return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
@@ -1212,12 +1203,12 @@ public class StreamTaskTest extends TestLogger {
 	/**
 	 * A task that register a processing time service callback.
 	 */
-	public static class TimeServiceTask extends StreamTask<String, AbstractStreamOperator<String>> {
+	public static class TimeServiceTask extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
 
 		private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>());
 
 		public TimeServiceTask(Environment env) {
-			super(env, null);
+			super(env);
 		}
 
 		public List<ClassLoader> getClassLoaders() {
@@ -1238,17 +1229,7 @@ public class StreamTaskTest extends TestLogger {
 		@Override
 		protected void performDefaultAction(ActionContext context) throws Exception {
 			syncLatch.await();
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() throws Exception {
-
-		}
-
-		@Override
-		protected void cancelTask() throws Exception {
-
+			super.performDefaultAction(context);
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 00860b6..bac9d43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -164,10 +164,6 @@ public class SynchronousCheckpointITCase {
 		}
 
 		@Override
-		protected void cancelTask() {
-		}
-
-		@Override
 		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
 			eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
 			boolean result = super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index f06cfd2..8b71423 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -155,7 +156,7 @@ public class SynchronousCheckpointTest {
 		return new StreamTaskUnderTest(environment, runningLatch, execLatch);
 	}
 
-	private static class StreamTaskUnderTest extends StreamTask {
+	private static class StreamTaskUnderTest extends NoOpStreamTask {
 
 		private final OneShotLatch runningLatch;
 		private final OneShotLatch execLatch;
@@ -170,19 +171,10 @@ public class SynchronousCheckpointTest {
 		}
 
 		@Override
-		protected void init() {}
-
-		@Override
 		protected void performDefaultAction(ActionContext context) throws Exception {
 			runningLatch.trigger();
 			execLatch.await();
-			context.allActionsCompleted();
+			super.performDefaultAction(context);
 		}
-
-		@Override
-		protected void cleanup() {}
-
-		@Override
-		protected void cancelTask() {}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 589b64c..e40e23d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -490,8 +490,5 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 
 		@Override
 		protected void cleanup() {}
-
-		@Override
-		protected void cancelTask() {}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 835d924..37e7328 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -89,9 +89,6 @@ public class MockStreamTask extends StreamTask {
 	protected void cleanup() { }
 
 	@Override
-	protected void cancelTask() { }
-
-	@Override
 	public String getName() {
 		return name;
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 1e383f8..d54ec1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.junit.Assume;
@@ -354,35 +355,4 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 			finishLatch.trigger();
 		}
 	}
-
-	/**
-	 * A {@link StreamTask} that does nothing.
-	 * This exists only to avoid having to implement all abstract methods in the subclasses above.
-	 */
-	public static class NoOpStreamTask extends StreamTask {
-
-		NoOpStreamTask(final Environment env) {
-			super(env);
-		}
-
-		@Override
-		protected void init() {
-
-		}
-
-		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
-			context.allActionsCompleted();
-		}
-
-		@Override
-		protected void cleanup() {
-
-		}
-
-		@Override
-		protected void cancelTask() throws Exception {
-
-		}
-	}
 }


[flink] 03/06: [FLINK-13016][network] Fix StreamTaskNetworkInput#isAvailable

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76eb11fdc4561e15c7f3ad614b416cd2a84bf50e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 27 13:39:32 2019 +0200

    [FLINK-13016][network] Fix StreamTaskNetworkInput#isAvailable
    
    Before this method was ignoring data/records buffered in the currentRecordDeserializer
---
 .../io/network/buffer/BufferBuilderTestUtils.java  |   4 +
 .../runtime/io/StreamTaskNetworkInput.java         |   3 +
 .../flink/streaming/runtime/io/MockInputGate.java  |  17 +++-
 .../runtime/io/StreamTaskNetworkInputTest.java     | 101 +++++++++++++++++++++
 4 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 7a68368..7696e08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -112,4 +112,8 @@ public class BufferBuilderTestUtils {
 		final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(size);
 		return new NetworkBuffer(seg, MemorySegment::free, true, size);
 	}
+
+	public static BufferBuilder createEmptyBufferBuilder(int bufferSize) {
+		return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), FreeingBufferRecycler.INSTANCE);
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 8c37141..b10e892 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -148,6 +148,9 @@ public final class StreamTaskNetworkInput implements StreamTaskInput {
 
 	@Override
 	public CompletableFuture<?> isAvailable() {
+		if (currentRecordDeserializer != null) {
+			return AVAILABLE;
+		}
 		return checkpointedInputGate.isAvailable();
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 65984f4..8cb6848 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -39,10 +39,20 @@ public class MockInputGate extends InputGate {
 
 	private final boolean[] closed;
 
-	MockInputGate(int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
+	private final boolean finishAfterLastBuffer;
+
+	public MockInputGate(int numberOfChannels, List<BufferOrEvent> bufferOrEvents) {
+		this(numberOfChannels, bufferOrEvents, true);
+	}
+
+	public MockInputGate(
+			int numberOfChannels,
+			List<BufferOrEvent> bufferOrEvents,
+			boolean finishAfterLastBuffer) {
 		this.numberOfChannels = numberOfChannels;
 		this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
 		this.closed = new boolean[numberOfChannels];
+		this.finishAfterLastBuffer = finishAfterLastBuffer;
 
 		isAvailable = AVAILABLE;
 	}
@@ -58,12 +68,15 @@ public class MockInputGate extends InputGate {
 
 	@Override
 	public boolean isFinished() {
-		return bufferOrEvents.isEmpty();
+		return finishAfterLastBuffer && bufferOrEvents.isEmpty();
 	}
 
 	@Override
 	public Optional<BufferOrEvent> getNext() {
 		BufferOrEvent next = bufferOrEvents.poll();
+		if (!finishAfterLastBuffer && bufferOrEvents.isEmpty()) {
+			resetIsAvailable();
+		}
 		if (next == null) {
 			return Optional.empty();
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
new file mode 100644
index 0000000..202b699
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StreamTaskNetworkInput}.
+ */
+public class StreamTaskNetworkInputTest {
+
+	private static final int PAGE_SIZE = 1000;
+
+	private final IOManager ioManager = new IOManagerAsync();
+
+	@After
+	public void tearDown() throws Exception {
+		ioManager.close();
+	}
+
+	@Test
+	public void testIsAvailableWithBufferedDataInDeserializer() throws Exception {
+		BufferBuilder bufferBuilder = BufferBuilderTestUtils.createEmptyBufferBuilder(PAGE_SIZE);
+
+		serializeRecord(42L, bufferBuilder);
+		serializeRecord(44L, bufferBuilder);
+
+		Buffer buffer = bufferBuilder.createBufferConsumer().build();
+
+		List<BufferOrEvent> buffers = Collections.singletonList(new BufferOrEvent(buffer, 0, false));
+
+		StreamTaskNetworkInput input = new StreamTaskNetworkInput(
+			new CheckpointedInputGate(
+				new MockInputGate(1, buffers, false),
+				new EmptyBufferStorage(),
+				new CheckpointBarrierTracker(1)),
+			LongSerializer.INSTANCE,
+			ioManager,
+			0);
+
+		assertHasNextElement(input);
+		assertHasNextElement(input);
+	}
+
+	private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
+		RecordSerializer<SerializationDelegate<StreamElement>> serializer = new SpanningRecordSerializer<>();
+		SerializationDelegate<StreamElement> serializationDelegate =
+			new SerializationDelegate<>(
+				new StreamElementSerializer<>(LongSerializer.INSTANCE));
+		serializationDelegate.setInstance(new StreamRecord<>(value));
+		serializer.serializeRecord(serializationDelegate);
+
+		assertFalse(serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer());
+	}
+
+	private static void assertHasNextElement(StreamTaskNetworkInput input) throws Exception {
+		assertTrue(input.isAvailable().isDone());
+		StreamElement element = input.pollNextNullable();
+		assertNotNull(element);
+		assertTrue(element.isRecord());
+	}
+}