You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/03/28 09:54:05 UTC

[flink] branch release-1.14 updated (df0363d -> 6504711)

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from df0363d  [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
     new 96c64a1  [hotfix][connector/common] Fix typo of variable in SourceOperator
     new 6504711  [FLINK-26018][connector/common] Create per-split output on split addition in SourceOperator

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-connectors/flink-connector-base/pom.xml      |   8 ++
 .../base/source/reader/SourceReaderBase.java       |   3 +
 .../source/reader/splitreader/SplitReader.java     |   5 +
 .../base/source/reader/SourceReaderBaseTest.java   | 140 ++++++++++++++++++++-
 .../connector/kafka/source/KafkaSourceITCase.java  |  63 ++++++++++
 .../streaming/api/operators/SourceOperator.java    |  55 ++++++--
 .../source/SourceOperatorEventTimeTest.java        | 121 +++++++-----------
 .../operators/source/TestingSourceOperator.java    |  57 +++++++++
 8 files changed, 363 insertions(+), 89 deletions(-)

[flink] 02/02: [FLINK-26018][connector/common] Create per-split output on split addition in SourceOperator

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65047113eceefd377df7017b6b8d553dda9a8758
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Fri Feb 11 18:19:10 2022 +0800

    [FLINK-26018][connector/common] Create per-split output on split addition in SourceOperator
    
    This change could avoid watermark being pushed forward by records from the first split in the first fetch when multiple splits are assigned to the source operator.
---
 flink-connectors/flink-connector-base/pom.xml      |   8 ++
 .../base/source/reader/SourceReaderBase.java       |   3 +
 .../source/reader/splitreader/SplitReader.java     |   5 +
 .../base/source/reader/SourceReaderBaseTest.java   | 140 ++++++++++++++++++++-
 .../connector/kafka/source/KafkaSourceITCase.java  |  63 ++++++++++
 .../streaming/api/operators/SourceOperator.java    |  47 +++++--
 .../source/SourceOperatorEventTimeTest.java        | 121 +++++++-----------
 .../operators/source/TestingSourceOperator.java    |  57 +++++++++
 8 files changed, 359 insertions(+), 85 deletions(-)

diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml
index e9d8d7d..fc72977 100644
--- a/flink-connectors/flink-connector-base/pom.xml
+++ b/flink-connectors/flink-connector-base/pom.xml
@@ -65,5 +65,13 @@
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 416dbb2..18d49f3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -324,6 +324,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
         SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
             if (sourceOutput == null) {
+                // The split output should have been created when AddSplitsEvent was processed in
+                // SourceOperator. Here we just use this method to get the previously created
+                // output.
                 sourceOutput = mainOutput.createOutputForSplit(splitId);
             }
             return sourceOutput;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 4f2ff6a..550cb95 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -50,6 +50,11 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
     /**
      * Handle the split changes. This call should be non-blocking.
      *
+     * <p>For the consistency of internal state in SourceReaderBase, if an invalid split is added to
+     * the reader (for example splits without any records), it should be put back into {@link
+     * RecordsWithSplitIds} as finished splits so that SourceReaderBase could be able to clean up
+     * resources created for it.
+     *
      * @param splitsChanges the split changes that the SplitReader needs to handle.
      */
     void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 07353c2..71ce413 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
@@ -37,11 +41,23 @@ import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
-
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import org.hamcrest.MatcherAssert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,6 +66,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -58,6 +76,8 @@ import static org.junit.Assert.assertTrue;
 /** A unit test class for {@link SourceReaderBase}. */
 public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBaseTest.class);
+
     @Rule public ExpectedException expectedException = ExpectedException.none();
 
     @Test
@@ -235,6 +255,91 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
                 InputStatus.MORE_AVAILABLE, sourceReader.pollNext(new TestingReaderOutput<>()));
     }
 
+    @Test
+    public void testPerSplitWatermarkWithEmitBeforeSplitAddition() throws Exception {
+        testPerSplitWatermark(true);
+    }
+
+    @Test
+    public void testPerSplitWatermarkWithoutEmitBeforeSplitAddition() throws Exception {
+        testPerSplitWatermark(false);
+    }
+
+    private void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws Exception {
+        MockSplitReader mockSplitReader =
+                MockSplitReader.newBuilder()
+                        .setNumRecordsPerSplitPerFetch(3)
+                        .setBlockingFetch(true)
+                        .build();
+
+        MockSourceReader reader =
+                new MockSourceReader(
+                        new FutureCompletingBlockingQueue<>(),
+                        () -> mockSplitReader,
+                        new Configuration(),
+                        new TestingReaderContext());
+
+        SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(
+                        reader,
+                        WatermarkStrategy.forGenerator(
+                                (context) -> new OnEventWatermarkGenerator()),
+                        true);
+
+        MockSourceSplit splitA = new MockSourceSplit(0, 0, 3);
+        splitA.addRecord(100);
+        splitA.addRecord(200);
+        splitA.addRecord(300);
+
+        MockSourceSplit splitB = new MockSourceSplit(1, 0, 3);
+        splitB.addRecord(150);
+        splitB.addRecord(250);
+        splitB.addRecord(350);
+
+        WatermarkCollectingDataOutput output = new WatermarkCollectingDataOutput();
+
+        if (emitRecordBeforeSplitAddition) {
+            sourceOperator.emitNext(output);
+        }
+
+        AddSplitEvent<MockSourceSplit> addSplitsEvent =
+                new AddSplitEvent<>(Arrays.asList(splitA, splitB), new MockSourceSplitSerializer());
+        sourceOperator.handleOperatorEvent(addSplitsEvent);
+
+        // First 3 records from split A should not generate any watermarks
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        sourceOperator.emitNext(output);
+                    } catch (Exception e) {
+                        LOG.warn("Exception caught at emitting records", e);
+                        return false;
+                    }
+                    return output.numRecords == 3;
+                },
+                Duration.ofSeconds(10),
+                String.format(
+                        "%d out of 3 records are received within timeout", output.numRecords));
+        assertTrue(output.watermarks.isEmpty());
+
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        sourceOperator.emitNext(output);
+                    } catch (Exception e) {
+                        LOG.warn("Exception caught at emitting records", e);
+                        return false;
+                    }
+                    return output.numRecords == 6;
+                },
+                Duration.ofSeconds(10),
+                String.format(
+                        "%d out of 6 records are received within timeout", output.numRecords));
+
+        assertEquals(3, output.watermarks.size());
+        MatcherAssert.assertThat(output.watermarks, contains(150L, 250L, 300L));
+    }
+
     // ---------------- helper methods -----------------
 
     @Override
@@ -375,4 +480,37 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
             }
         }
     }
+
+    private static class OnEventWatermarkGenerator implements WatermarkGenerator<Integer> {
+
+        @Override
+        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
+            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(event));
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {}
+    }
+
+    private static class WatermarkCollectingDataOutput
+            implements PushingAsyncDataInput.DataOutput<Integer> {
+        int numRecords = 0;
+        final List<Long> watermarks = new ArrayList<>();
+
+        @Override
+        public void emitRecord(StreamRecord<Integer> streamRecord) {
+            numRecords++;
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) throws Exception {
+            watermarks.add(watermark.getTimestamp());
+        }
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index cf8e501..1f740b5 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -37,6 +40,7 @@ import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
 import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -68,10 +72,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unite test class for {@link KafkaSource}. */
 public class KafkaSourceITCase {
@@ -252,6 +258,52 @@ public class KafkaSourceITCase {
                             "testBasicReadWithoutGroupId");
             executeAndVerify(env, stream);
         }
+
+        @Test
+        public void testPerPartitionWatermark() throws Throwable {
+            String watermarkTopic = "watermarkTestTopic-" + UUID.randomUUID();
+            KafkaSourceTestEnv.createTestTopic(watermarkTopic, 2, 1);
+            List<ProducerRecord<String, Integer>> records =
+                    Arrays.asList(
+                            new ProducerRecord<>(watermarkTopic, 0, 100L, null, 100),
+                            new ProducerRecord<>(watermarkTopic, 0, 200L, null, 200),
+                            new ProducerRecord<>(watermarkTopic, 0, 300L, null, 300),
+                            new ProducerRecord<>(watermarkTopic, 1, 150L, null, 150),
+                            new ProducerRecord<>(watermarkTopic, 1, 250L, null, 250),
+                            new ProducerRecord<>(watermarkTopic, 1, 350L, null, 350));
+            KafkaSourceTestEnv.produceToKafka(records);
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(watermarkTopic)
+                            .setGroupId("watermark-test")
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema(false))
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            env.fromSource(
+                            source,
+                            WatermarkStrategy.forGenerator(
+                                    (context) -> new OnEventWatermarkGenerator()),
+                            "testPerPartitionWatermark")
+                    .process(
+                            new ProcessFunction<PartitionAndValue, Long>() {
+                                @Override
+                                public void processElement(
+                                        PartitionAndValue value,
+                                        ProcessFunction<PartitionAndValue, Long>.Context ctx,
+                                        Collector<Long> out) {
+                                    assertTrue(
+                                            ctx.timestamp()
+                                                    >= ctx.timerService().currentWatermark(),
+                                            "Event time should never behind watermark "
+                                                    + "because of per-split watermark multiplexing logic");
+                                }
+                            });
+            env.execute();
+        }
     }
 
     /** Integration test based on connector testing framework. */
@@ -391,4 +443,15 @@ public class KafkaSourceITCase {
                     }
                 });
     }
+
+    private static class OnEventWatermarkGenerator
+            implements WatermarkGenerator<PartitionAndValue> {
+        @Override
+        public void onEvent(PartitionAndValue event, long eventTimestamp, WatermarkOutput output) {
+            output.emitWatermark(new Watermark(eventTimestamp));
+        }
+
+        @Override
+        public void onPeriodicEmit(WatermarkOutput output) {}
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 796defb..a89e66a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -62,6 +62,7 @@ import org.apache.flink.util.function.FunctionWithException;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
@@ -144,6 +145,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     private final SourceOperatorAvailabilityHelper availabilityHelper =
             new SourceOperatorAvailabilityHelper();
 
+    private final List<SplitT> outputPendingSplits = new ArrayList<>();
+
     private enum OperatingMode {
         READING,
         OUTPUT_NOT_INITIALIZED,
@@ -356,10 +359,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Exception {
         switch (operatingMode) {
             case OUTPUT_NOT_INITIALIZED:
-                currentMainOutput = eventTimeLogic.createMainOutput(output);
-                initializeLatencyMarkerEmitter(output);
-                lastInvokedOutput = output;
-                this.operatingMode = OperatingMode.READING;
+                initializeMainOutput(output);
                 return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
             case SOURCE_STOPPED:
                 this.operatingMode = OperatingMode.DATA_FINISHED;
@@ -374,6 +374,21 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         }
     }
 
+    private void initializeMainOutput(DataOutput<OUT> output) {
+        currentMainOutput = eventTimeLogic.createMainOutput(output);
+        initializeLatencyMarkerEmitter(output);
+        lastInvokedOutput = output;
+        // Create per-split output for pending splits added before main output is initialized
+        createOutputForSplits(outputPendingSplits);
+        this.operatingMode = OperatingMode.READING;
+    }
+
+    private void createOutputForSplits(List<SplitT> newSplits) {
+        for (SplitT split : newSplits) {
+            currentMainOutput.createOutputForSplit(split.splitId());
+        }
+    }
+
     private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) {
         long latencyTrackingInterval =
                 getExecutionConfig().isLatencyTrackingConfigured()
@@ -454,11 +469,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
     @SuppressWarnings("unchecked")
     public void handleOperatorEvent(OperatorEvent event) {
         if (event instanceof AddSplitEvent) {
-            try {
-                sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits(splitSerializer));
-            } catch (IOException e) {
-                throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
-            }
+            handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));
         } else if (event instanceof SourceEventWrapper) {
             sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
         } else if (event instanceof NoMoreSplitsEvent) {
@@ -468,6 +479,24 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         }
     }
 
+    private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
+        try {
+            List<SplitT> newSplits = event.splits(splitSerializer);
+            if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
+                // For splits arrived before the main output is initialized, store them into the
+                // pending list. Outputs of these splits will be created once the main output is
+                // ready.
+                outputPendingSplits.addAll(newSplits);
+            } else {
+                // Create output directly for new splits if the main output is already initialized.
+                createOutputForSplits(newSplits);
+            }
+            sourceReader.addSplits(newSplits);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
+        }
+    }
+
     private void registerReader() {
         operatorEventGateway.sendEventToCoordinator(
                 new ReaderRegistrationEvent(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index c0a84f9..3b120a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -18,30 +18,17 @@
 
 package org.apache.flink.streaming.api.operators.source;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.util.MockOutput;
-import org.apache.flink.streaming.util.MockStreamConfig;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
@@ -52,13 +39,13 @@ import org.junit.runners.Parameterized;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
@@ -159,6 +146,35 @@ public class SourceOperatorEventTimeTest {
                 result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
     }
 
+    @Test
+    public void testCreatingPerSplitOutputOnSplitAddition() throws Exception {
+        final WatermarkStrategy<Integer> watermarkStrategy =
+                WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
+
+        InterpretingSourceReader reader =
+                new InterpretingSourceReader(
+                        // No watermark (no record from split 2, whose watermark is Long.MIN_VALUE)
+                        (output) -> output.createOutputForSplit("1").collect(0, 100L),
+                        (output) -> output.createOutputForSplit("1").collect(0, 200L),
+                        (output) -> output.createOutputForSplit("1").collect(0, 300L),
+                        // Emit watermark 150 (from the 1st record of split 2)
+                        (output) -> output.createOutputForSplit("2").collect(0, 150L),
+                        // Emit watermark 300 (from the 3rd record in split 1)
+                        (output) -> output.createOutputForSplit("2").collect(0, 400L));
+        SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks);
+
+        // Add two splits to SourceOperator. Output for two splits should be created during event
+        // handling.
+        sourceOperator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(new MockSourceSplit(1), new MockSourceSplit(2)),
+                        new MockSourceSplitSerializer()));
+
+        final List<Watermark> result = testSequenceOfWatermarks(sourceOperator);
+        assertWatermarksOrEmpty(result, new Watermark(150L), new Watermark(300L));
+    }
+
     // ------------------------------------------------------------------------
     //   test execution helpers
     // ------------------------------------------------------------------------
@@ -186,9 +202,18 @@ public class SourceOperatorEventTimeTest {
             final WatermarkStrategy<Integer> watermarkStrategy,
             final Consumer<ReaderOutput<Integer>>... actions)
             throws Exception {
+        final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions);
+        final SourceOperator<Integer, MockSourceSplit> sourceOperator =
+                createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks);
 
-        final List<Object> allEvents =
-                testSequenceOfEvents(emitProgressiveWatermarks, watermarkStrategy, actions);
+        return testSequenceOfWatermarks(sourceOperator);
+    }
+
+    @SuppressWarnings("FinalPrivateMethod")
+    private final List<Watermark> testSequenceOfWatermarks(
+            SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
+
+        final List<Object> allEvents = testSequenceOfEvents(sourceOperator);
 
         return allEvents.stream()
                 .filter((evt) -> evt instanceof org.apache.flink.streaming.api.watermark.Watermark)
@@ -201,23 +226,13 @@ public class SourceOperatorEventTimeTest {
     }
 
     @SuppressWarnings("FinalPrivateMethod")
-    @SafeVarargs
     private final List<Object> testSequenceOfEvents(
-            final boolean emitProgressiveWatermarks,
-            final WatermarkStrategy<Integer> watermarkStrategy,
-            final Consumer<ReaderOutput<Integer>>... actions)
-            throws Exception {
+            final SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
 
         final CollectingDataOutput<Integer> out = new CollectingDataOutput<>();
 
-        final TestProcessingTimeService timeService = new TestProcessingTimeService();
-        timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero
-
-        final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions);
-
-        final SourceOperator<Integer, MockSourceSplit> sourceOperator =
-                createTestOperator(
-                        reader, watermarkStrategy, timeService, emitProgressiveWatermarks);
+        final TestProcessingTimeService timeService =
+                ((TestProcessingTimeService) sourceOperator.getProcessingTimeService());
 
         while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) {
             timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100);
@@ -227,50 +242,6 @@ public class SourceOperatorEventTimeTest {
     }
 
     // ------------------------------------------------------------------------
-    //   test setup helpers
-    // ------------------------------------------------------------------------
-
-    private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
-            SourceReader<T, MockSourceSplit> reader,
-            WatermarkStrategy<T> watermarkStrategy,
-            ProcessingTimeService timeService,
-            boolean emitProgressiveWatermarks)
-            throws Exception {
-
-        final OperatorStateStore operatorStateStore =
-                new MemoryStateBackend()
-                        .createOperatorStateBackend(
-                                new MockEnvironmentBuilder().build(),
-                                "test-operator",
-                                Collections.emptyList(),
-                                new CloseableRegistry());
-
-        final StateInitializationContext stateContext =
-                new StateInitializationContextImpl(null, operatorStateStore, null, null, null);
-
-        final SourceOperator<T, MockSourceSplit> sourceOperator =
-                new TestingSourceOperator<>(
-                        reader, watermarkStrategy, timeService, emitProgressiveWatermarks);
-
-        sourceOperator.setup(
-                new SourceOperatorStreamTask<Integer>(
-                        new StreamMockEnvironment(
-                                new Configuration(),
-                                new Configuration(),
-                                new ExecutionConfig(),
-                                1L,
-                                new MockInputSplitProvider(),
-                                1,
-                                new TestTaskStateManager())),
-                new MockStreamConfig(new Configuration(), 1),
-                new MockOutput<>(new ArrayList<>()));
-        sourceOperator.initializeState(stateContext);
-        sourceOperator.open();
-
-        return sourceOperator;
-    }
-
-    // ------------------------------------------------------------------------
     //   test mocks
     // ------------------------------------------------------------------------
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 7becd40..281b572 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -20,19 +20,34 @@ package org.apache.flink.streaming.api.operators.source;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
+import java.util.ArrayList;
+import java.util.Collections;
+
 /** A SourceOperator extension to simplify test setup. */
 public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> {
 
@@ -117,4 +132,46 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit>
         cfg.setAutoWatermarkInterval(100);
         return cfg;
     }
+
+    public static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+            SourceReader<T, MockSourceSplit> reader,
+            WatermarkStrategy<T> watermarkStrategy,
+            boolean emitProgressiveWatermarks)
+            throws Exception {
+
+        final OperatorStateStore operatorStateStore =
+                new HashMapStateBackend()
+                        .createOperatorStateBackend(
+                                new MockEnvironmentBuilder().build(),
+                                "test-operator",
+                                Collections.emptyList(),
+                                new CloseableRegistry());
+
+        final StateInitializationContext stateContext =
+                new StateInitializationContextImpl(null, operatorStateStore, null, null, null);
+
+        TestProcessingTimeService timeService = new TestProcessingTimeService();
+        timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero
+
+        final SourceOperator<T, MockSourceSplit> sourceOperator =
+                new TestingSourceOperator<>(
+                        reader, watermarkStrategy, timeService, emitProgressiveWatermarks);
+
+        sourceOperator.setup(
+                new SourceOperatorStreamTask<Integer>(
+                        new StreamMockEnvironment(
+                                new Configuration(),
+                                new Configuration(),
+                                new ExecutionConfig(),
+                                1L,
+                                new MockInputSplitProvider(),
+                                1,
+                                new TestTaskStateManager())),
+                new MockStreamConfig(new Configuration(), 1),
+                new MockOutput<>(new ArrayList<>()));
+        sourceOperator.initializeState(stateContext);
+        sourceOperator.open();
+
+        return sourceOperator;
+    }
 }

[flink] 01/02: [hotfix][connector/common] Fix typo of variable in SourceOperator

Posted by le...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 96c64a114e3565b96d88f29b598ae53360074b04
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Fri Feb 11 11:58:30 2022 +0800

    [hotfix][connector/common] Fix typo of variable in SourceOperator
    
    (cherry picked from commit c81fff6d27c6fc599e0f740f1850d63d5f078413)
---
 .../org/apache/flink/streaming/api/operators/SourceOperator.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 0866a52..796defb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -153,7 +153,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
 
     private InternalSourceReaderMetricGroup sourceMetricGroup;
 
-    private @Nullable LatencyMarkerEmitter<OUT> latencyMarerEmitter;
+    private @Nullable LatencyMarkerEmitter<OUT> latencyMarkerEmitter;
 
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
@@ -309,8 +309,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
         if (eventTimeLogic != null) {
             eventTimeLogic.stopPeriodicWatermarkEmits();
         }
-        if (latencyMarerEmitter != null) {
-            latencyMarerEmitter.close();
+        if (latencyMarkerEmitter != null) {
+            latencyMarkerEmitter.close();
         }
         super.finish();
 
@@ -384,7 +384,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
                                 .getConfiguration()
                                 .getLong(MetricOptions.LATENCY_INTERVAL);
         if (latencyTrackingInterval > 0) {
-            latencyMarerEmitter =
+            latencyMarkerEmitter =
                     new LatencyMarkerEmitter<>(
                             getProcessingTimeService(),
                             output::emitLatencyMarker,