You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/16 04:46:05 UTC

[flink] branch master updated (677867a -> 91932cc)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 677867a  [FLINK-17450][sql-parser][hive] Implement function & catalog DDLs for Hive dialect
     new 9207b81  [FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)
     new 91932cc  [FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 113 +++++++++++++++++++--
 .../channel/ChannelStateCheckpointWriter.java      |   7 ++
 .../channel/ChannelStateCheckpointWriterTest.java  |  23 +++++
 3 files changed, 132 insertions(+), 11 deletions(-)


[flink] 02/02: [FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91932ccc0592f969dc7d264f2adaecfa0c65d3da
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:51:17 2020 +0200

    [FLINK-17727][e2e] Re-enable "[FLINK-17467][task][e2e] Modify existing upgrade test to verify aligned savepoints" after a fix
    
    This reverts commit 3af17562eb791e3f486c38dbd94dc3328309b262.
---
 .../tests/StatefulStreamJobUpgradeTestProgram.java | 113 +++++++++++++++++++--
 1 file changed, 102 insertions(+), 11 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 3cec4b8..aa94578 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.tests;
 
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -73,7 +74,17 @@ public class StatefulStreamJobUpgradeTestProgram {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		setupEnvironment(env, pt);
+		env.getCheckpointConfig().enableUnalignedCheckpoints();
 
+		if (isOriginalJobVariant(pt)) {
+			executeOriginalVariant(env, pt);
+		}
+		else {
+			executeUpgradedVariant(env, pt);
+		}
+	}
+
+	private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
 		KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
 			.name("EventSource")
 			.uid("EventSource")
@@ -83,11 +94,34 @@ public class StatefulStreamJobUpgradeTestProgram {
 		List<TypeSerializer<ComplexPayload>> stateSer =
 			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
 
-		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
-			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+		KeyedStream<Event, Integer> afterStatefulOperations =
+			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList());
+
+		afterStatefulOperations
+			.flatMap(createSemanticsCheckMapper(pt))
+			.name("SemanticsCheckMapper")
+			.addSink(new PrintSinkFunction<>());
+
+		env.execute("General purpose test job");
+	}
+
+	private static void executeUpgradedVariant(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+		KeyedStream<UpgradedEvent, Integer> source = env.addSource(createEventSource(pt))
+			.name("EventSource")
+			.uid("EventSource")
+			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+			.map(new UpgradeEvent())
+			.keyBy(UpgradedEvent::getKey);
+
+		List<TypeSerializer<ComplexPayload>> stateSer =
+			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+		KeyedStream<UpgradedEvent, Integer> afterStatefulOperations =
 			applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
 
 		afterStatefulOperations
+			.map(new DowngradeEvent())
+			.keyBy(Event::getKey)
 			.flatMap(createSemanticsCheckMapper(pt))
 			.name("SemanticsCheckMapper")
 			.addSink(new PrintSinkFunction<>());
@@ -115,15 +149,6 @@ public class StatefulStreamJobUpgradeTestProgram {
 		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer, stateClass);
 	}
 
-	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
-			KeyedStream<Event, Integer> source,
-			List<TypeSerializer<ComplexPayload>> stateSer,
-			List<Class<ComplexPayload>> stateClass) {
-		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer, stateClass);
-		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer, stateClass);
-		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer, stateClass);
-	}
-
 	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
 			String name,
 			JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
@@ -138,6 +163,29 @@ public class StatefulStreamJobUpgradeTestProgram {
 			.keyBy(Event::getKey);
 	}
 
+	private static KeyedStream<UpgradedEvent, Integer> applyUpgradedStatefulOperations(
+			KeyedStream<UpgradedEvent, Integer> source,
+			List<TypeSerializer<ComplexPayload>> stateSer,
+			List<Class<ComplexPayload>> stateClass) {
+		source = applyUpgradedTestStatefulOperator("stateMap2", simpleUpgradedStateUpdate("stateMap2"), source, stateSer, stateClass);
+		source = applyUpgradedTestStatefulOperator("stateMap1", lastUpgradedStateUpdate("stateMap1"), source, stateSer, stateClass);
+		return applyUpgradedTestStatefulOperator("stateMap3", simpleUpgradedStateUpdate("stateMap3"), source, stateSer, stateClass);
+	}
+
+	private static KeyedStream<UpgradedEvent, Integer> applyUpgradedTestStatefulOperator(
+			String name,
+			JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> stateFunc,
+			KeyedStream<UpgradedEvent, Integer> source,
+			List<TypeSerializer<ComplexPayload>> stateSer,
+			List<Class<ComplexPayload>> stateClass) {
+		return source
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
+			.name(name)
+			.uid(name)
+			.returns(UpgradedEvent.class)
+			.keyBy(UpgradedEvent::getKey);
+	}
+
 	private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String strPayload) {
 		return (Event first, ComplexPayload second) -> {
 			verifyState(strPayload, second);
@@ -145,6 +193,13 @@ public class StatefulStreamJobUpgradeTestProgram {
 		};
 	}
 
+	private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> simpleUpgradedStateUpdate(String strPayload) {
+		return (UpgradedEvent first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			return new ComplexPayload(first.event, strPayload);
+		};
+	}
+
 	private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
 		return (Event first, ComplexPayload second) -> {
 			verifyState(strPayload, second);
@@ -153,9 +208,45 @@ public class StatefulStreamJobUpgradeTestProgram {
 		};
 	}
 
+	private static JoinFunction<UpgradedEvent, ComplexPayload, ComplexPayload> lastUpgradedStateUpdate(String strPayload) {
+		return (UpgradedEvent first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			boolean isLastEvent = second != null && first.event.getEventTime() <= second.getEventTime();
+			return isLastEvent ? second : new ComplexPayload(first.event, strPayload);
+		};
+	}
+
 	private static void verifyState(String strPayload, ComplexPayload state) {
 		if (state != null && !state.getStrPayload().equals(strPayload)) {
 			System.out.println("State is set or restored incorrectly");
 		}
 	}
+
+	private static class UpgradeEvent implements MapFunction<Event, UpgradedEvent> {
+		@Override
+		public UpgradedEvent map(Event event) {
+			return new UpgradedEvent(event);
+		}
+	}
+
+	private static class UpgradedEvent {
+		public final Event event;
+		public final String randomPayload;
+
+		public UpgradedEvent(Event event) {
+			this.event = event;
+			this.randomPayload = event.getPayload().toUpperCase();
+		}
+
+		public int getKey() {
+			return event.getKey();
+		}
+	}
+
+	private static class DowngradeEvent implements MapFunction<UpgradedEvent, Event> {
+		@Override
+		public Event map(UpgradedEvent value) throws Exception {
+			return value.event;
+		}
+	}
 }


[flink] 01/02: [FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9207b81cb15d413c07f55d619667ab78f7725a9a
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri May 15 14:49:51 2020 +0200

    [FLINK-17727][task] don't create output stream with no channel state (unaligned checkpoints)
---
 .../channel/ChannelStateCheckpointWriter.java      |  7 +++++++
 .../channel/ChannelStateCheckpointWriterTest.java  | 23 ++++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
index ef04857..6a2b68d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
+import static java.util.Collections.emptyList;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -150,6 +151,12 @@ class ChannelStateCheckpointWriter {
 	}
 
 	private void finishWriteAndResult() throws IOException {
+		if (inputChannelOffsets.isEmpty() && resultSubpartitionOffsets.isEmpty()) {
+			dataStream.close();
+			result.inputChannelStateHandles.complete(emptyList());
+			result.resultSubpartitionStateHandles.complete(emptyList());
+			return;
+		}
 		dataStream.flush();
 		StreamStateHandle underlying = checkpointStream.closeAndGetHandle();
 		complete(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index 196c948..1d501f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelSta
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -37,6 +38,7 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * {@link ChannelStateCheckpointWriter} test.
@@ -47,6 +49,27 @@ public class ChannelStateCheckpointWriterTest {
 	private final Random random = new Random();
 
 	@Test
+	public void testEmptyState() throws Exception {
+		MemoryCheckpointOutputStream stream = new MemoryCheckpointOutputStream(1000) {
+			@Override
+			public StreamStateHandle closeAndGetHandle() {
+				fail("closeAndGetHandle shouldn't be called for empty channel state");
+				return null;
+			}
+		};
+		ChannelStateCheckpointWriter writer = new ChannelStateCheckpointWriter(
+				1L,
+				new ChannelStateWriteResult(),
+				stream,
+				new ChannelStateSerializerImpl(),
+				NO_OP_RUNNABLE
+		);
+		writer.completeOutput();
+		writer.completeInput();
+		assertTrue(stream.isClosed());
+	}
+
+	@Test
 	public void testRecyclingBuffers() throws Exception {
 		ChannelStateCheckpointWriter writer = createWriter(new ChannelStateWriteResult());
 		NetworkBuffer buffer = new NetworkBuffer(HeapMemorySegment.FACTORY.allocateUnpooledSegment(10, null), FreeingBufferRecycler.INSTANCE);