You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/06 08:53:54 UTC

[GitHub] [flink] alpreu opened a new pull request, #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

alpreu opened a new pull request, #19655:
URL: https://github.com/apache/flink/pull/19655

   ## What is the purpose of the change
   
   This pull request introduces a new file-bases upsert sink for testing internal components.
   
   
   ## Brief change log
   
     -  Created a file-based upsert test sink
     - The SinkWriter stores records as key-value pairs in a map, on every flush the map is written/appended to the file and cleared. Subsequent flushes thus always write out the delta of the next records.
     - Created `BinaryFileUtil` class to provide simple reading from Files/Streams that were written from the Sink. This provides both lowlevel (ByteBuffer with the serialized records) as well as highlevel (actual record by using a DeserializationSchema) access 
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added Unit tests and ITCases for the introduced components
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / *no*)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (*yes* / no)
     - The serializers: (yes / *no* / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / *no* / don't know)
     - The S3 file system connector: (yes / *no* / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (*yes* / no)
     - If yes, how is the feature documented? (not applicable / docs / *JavaDocs* / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877948231


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSink.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.connector.upserttest.sink.UpsertTestSinkWriter.MAGIC_BYTE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to upsert test data into a file. This Sink is intended for testing internal
+ * functionality and is **not** production-ready.
+ *
+ * <p>Please note that the UpsertTestSink needs to run with a parallelism of 1 to function
+ * correctly. There is currently no support for using multiple writers at once.
+ *
+ * @param <IN> type of records written to the file
+ * @see UpsertTestSinkBuilder on how to construct an UpsertTestSink
+ */
+@PublicEvolving
+public class UpsertTestSink<IN> implements Sink<IN> {
+
+    private final File outputFile;
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+
+    UpsertTestSink(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.outputFile = checkNotNull(outputFile);
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+    }
+
+    /**
+     * Create a {@link UpsertTestSinkBuilder} to construct a new {@link UpsertTestSink}.
+     *
+     * @param <IN> type of incoming records
+     * @return {@link UpsertTestSinkBuilder}
+     */
+    public static <IN> UpsertTestSinkBuilder<IN> builder() {
+        return new UpsertTestSinkBuilder<>();
+    }
+
+    @Internal
+    @Override
+    public SinkWriter<IN> createWriter(InitContext context) {
+        return new UpsertTestSinkWriter<>(
+                outputFile, keySerializationSchema, valueSerializationSchema);
+    }
+
+    /////////////////////////////////////////////////////////////
+    // Utilities
+    /////////////////////////////////////////////////////////////
+
+    /**
+     * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the
+     * given File.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return the number of records
+     * @throws IOException
+     */
+    public static int getNumberOfRecords(BufferedInputStream bis) throws IOException {
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = readRecords(bis);
+        return records.size();
+    }
+
+    /**
+     * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the
+     * given File.
+     *
+     * @param file The File to read from
+     * @return the number of records
+     * @throws IOException
+     */
+    public static int getNumberOfRecords(File file) throws IOException {
+        checkNotNull(file);
+        FileInputStream fs = new FileInputStream(file);
+        BufferedInputStream bis = new BufferedInputStream(fs);
+        return getNumberOfRecords(bis);
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given
+     * InputStream and converts them using the provided {@link DeserializationSchema}s.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @param keyDeserializationSchema The key's DeserializationSchema
+     * @param valueDeserializationSchema The value's DeserializationSchema
+     * @return Map containing the deserialized key-value pairs
+     * @throws IOException
+     */
+    public static <K, V> Map<K, V> readRecords(
+            BufferedInputStream bis,
+            DeserializationSchema<K> keyDeserializationSchema,
+            DeserializationSchema<V> valueDeserializationSchema)
+            throws IOException {
+        checkNotNull(bis);
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> bytesMap = readRecords(bis);
+        Map<K, V> typedMap = new HashMap<>(bytesMap.size());
+
+        Iterator<Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper>> it =
+                bytesMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> entry = it.next();
+            K key = keyDeserializationSchema.deserialize(entry.getKey().array());
+            V value = valueDeserializationSchema.deserialize(entry.getValue().array());
+            typedMap.put(key, value);
+        }
+        return typedMap;
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given File
+     * and converts them using the provided {@link DeserializationSchema}s.
+     *
+     * @param file The File to read from
+     * @param keyDeserializationSchema The key's DeserializationSchema
+     * @param valueDeserializationSchema The value's DeserializationSchema
+     * @return Map containing the deserialized key-value pairs
+     * @throws IOException
+     */
+    public static <K, V> Map<K, V> readRecords(
+            File file,
+            DeserializationSchema<K> keyDeserializationSchema,
+            DeserializationSchema<V> valueDeserializationSchema)
+            throws IOException {
+        checkNotNull(file);
+        FileInputStream fs = new FileInputStream(file);
+        BufferedInputStream bis = new BufferedInputStream(fs);
+        return readRecords(bis, keyDeserializationSchema, valueDeserializationSchema);
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given
+     * InputStream.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return Map containing the read ImmutableByteArrayWrapper key-value pairs
+     * @throws IOException
+     */
+    private static Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> readRecords(
+            BufferedInputStream bis) throws IOException {
+        checkNotNull(bis);
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = new HashMap<>();
+        int magicByte;
+        while ((magicByte = bis.read()) != -1) {

Review Comment:
   We are reading individual records from the file so we need to iterate until none are left



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1136886116

   Following @twalthr s remarks I just pushed a change that exchanges the 'flink-table-planner' dependency for 'flink-table-planner-loader' and 'flink-table-runtime'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1132500442

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868907018


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {
+    private static final byte MAGIC_BYTE = 13;
+
+    /**
+     * Reads records that were written using the {@link BinaryFileUtil#readRecords} method from the
+     * given InputStream.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return Map containing the read ByteBuffer key-value pairs
+     * @throws IOException
+     */
+    public static Map<ByteBuffer, ByteBuffer> readRecords(BufferedInputStream bis)
+            throws IOException {
+        checkNotNull(bis);
+        Map<ByteBuffer, ByteBuffer> records = new HashMap<>();

Review Comment:
   You are right in the current implementation everything is safe. My main concern are future changes because once someone changes the position (reading) before putting the `ByteBuffer` in the map the hash has changed.
   
   I'd propose two options either you build your own simple wrapper that uses a similar hashCode but the position is always zero or you make a big warning in the SinkWriter that the ByteBuffers should never be touched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] AHeise commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
AHeise commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r870283039


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {

Review Comment:
   Can't we just use TypeInformation and its TypeSerializer/Deserializer? Imho the user doesn't need to supply any special code and we can just use what it's already there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868882875


##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+    @Test
+    public void testWrite(@TempDir File tempDir) throws Exception {

Review Comment:
   I had the misconception that the test would then share the directory, but this is only the case if its declared as static. I'll change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1142008650

   Thanks for the contribution, @alpreu . I'm going to merge the PR based on the successful CI run and the approval of @fapaul 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp merged pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
XComp merged PR #19655:
URL: https://github.com/apache/flink/pull/19655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r867894179


##########
flink-connectors/flink-connector-upsert-test/pom.xml:
##########
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-connectors</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-connector-upsert-test</artifactId>
+	<name>Flink : Connectors : Upsert Test</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java-bridge</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-json</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- ArchUit test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-architecture-tests-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>

Review Comment:
   Why do you need to build a test-jar?



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSink.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.io.File;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link UpsertTestSink} from a logical
+ * description.
+ */
+@Internal

Review Comment:
   Nit: Theoretically for package-private classes the annotation is not necessary but it is not a bad idea to still keep it.



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {
+    private static final byte MAGIC_BYTE = 13;
+
+    /**
+     * Reads records that were written using the {@link BinaryFileUtil#readRecords} method from the
+     * given InputStream.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return Map containing the read ByteBuffer key-value pairs
+     * @throws IOException
+     */
+    public static Map<ByteBuffer, ByteBuffer> readRecords(BufferedInputStream bis)
+            throws IOException {
+        checkNotNull(bis);
+        Map<ByteBuffer, ByteBuffer> records = new HashMap<>();

Review Comment:
   I am afraid using `ByteBuffers` as a key does not work. I looked at the code of `ByteBuffer#hashCode` and the implementation is completely independent of the actual content of the buffer. The hash is calculated from the contained metadata information that easily might lead to unwanted collisions.



##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+    @Test
+    public void testWrite(@TempDir File tempDir) throws Exception {

Review Comment:
   Why did you pass the `@TempDir` as a parameter instead of defining it on the class level as an instance variable? Is there a benefit?



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkBuilder.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import java.io.File;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Builder to construct {@link UpsertTestSink}.
+ *
+ * <p>The following example shows the minimum setup to create a UpsertTestSink that writes {@code
+ * Tuple2<String, String>} values to a file.
+ *
+ * <pre>{@code
+ * UpsertTestSink<Tuple2<String, String>> sink = UpsertTestSink
+ *     .<Tuple2<String, String>>builder
+ *     .setOutputFile(MY_OUTPUT_FILE)
+ *     .setKeySerializationSchema(MY_KEY_SERIALIZER)
+ *     .setValueSerializationSchema(MY_VALUE_SERIALIZER)
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> type of the records written to the file
+ */
+@PublicEvolving
+public class UpsertTestSinkBuilder<IN> {
+
+    private File outputFile;
+    private SerializationSchema<IN> keySerializationSchema;
+    private SerializationSchema<IN> valueSerializationSchema;
+
+    /**
+     * Sets the output {@link File} to write to.
+     *
+     * @param outputFile
+     * @return {@link UpsertTestSinkBuilder}
+     */
+    public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile) {
+        this.outputFile = checkNotNull(outputFile);
+        return this;
+    }
+
+    /**
+     * Sets the key {@link SerializationSchema} that transforms incoming records to byte[].
+     *
+     * @param keySerializationSchema
+     * @return {@link UpsertTestSinkBuilder}
+     */
+    public UpsertTestSinkBuilder<IN> setKeySerializationSchema(
+            SerializationSchema<IN> keySerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        return this;
+    }
+
+    /**
+     * Sets the value {@link SerializationSchema} that transforms incoming records to byte[].
+     *
+     * @param valueSerializationSchema
+     * @return {@link UpsertTestSinkBuilder}
+     */
+    public UpsertTestSinkBuilder<IN> setValueSerializationSchema(
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        return this;
+    }
+
+    /**
+     * Constructs the {@link UpsertTestSink} with the configured properties.
+     *
+     * @return {@link UpsertTestSink}
+     */
+    public UpsertTestSink<IN> build() {
+        return new UpsertTestSink<>(outputFile, keySerializationSchema, valueSerializationSchema);

Review Comment:
   Nit: Please also validate that the necessary fields of the builder are not null.



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for writing records into a file in an upsert-fashion. On every
+ * checkpoint each key-value pair currently in the map is written to the file.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
+    private static final Logger LOG = LoggerFactory.getLogger(UpsertTestSinkWriter.class);
+
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+    private final Map<ByteBuffer, ByteBuffer> records = new HashMap<>();
+    private final BufferedOutputStream bufferedOutputStream;
+
+    UpsertTestSinkWriter(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        checkNotNull(outputFile);
+        try {
+            this.bufferedOutputStream =
+                    new BufferedOutputStream(new FileOutputStream(outputFile, true));
+        } catch (FileNotFoundException e) {
+            throw new FlinkRuntimeException("Could not find file", e);
+        }
+    }
+
+    @Override
+    public void write(IN element, Context context) {
+        byte[] key = keySerializationSchema.serialize(element);
+        byte[] value = valueSerializationSchema.serialize(element);
+        records.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        LOG.debug("Flushing records, endOfInput={}", endOfInput);

Review Comment:
   Nit: Remove log statement?



##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+    @Test
+    public void testWrite(@TempDir File tempDir) throws Exception {
+        File outputFile = new File(tempDir, "records.out");
+        SerializationSchema<Tuple2<String, String>> keySerializationSchema =
+                element -> element.f0.getBytes();
+        SerializationSchema<Tuple2<String, String>> valueSerializationSchema =
+                element -> element.f1.getBytes();
+
+        try (final UpsertTestSinkWriter<Tuple2<String, String>> writer =
+                new UpsertTestSinkWriter<>(
+                        outputFile, keySerializationSchema, valueSerializationSchema)) {
+            for (int i = 0; i < 10; i++) {
+                writer.write(Tuple2.of("Key #" + i, "Value #" + i), null);
+            }
+        }
+
+        DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema();
+        DeserializationSchema<String> valueDeserializationSchema =
+                new StringDeserializationSchema();
+
+        Map<String, String> resultMap =
+                BinaryFileUtil.readRecords(
+                        outputFile, keyDeserializationSchema, valueDeserializationSchema);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i);
+        }
+    }
+
+    @Test
+    public void testWriteOnCheckpoint(@TempDir File tempDir) throws Exception {
+        File outputFile = new File(tempDir, "records.out");
+        SerializationSchema<Tuple2<String, String>> keySerializationSchema =
+                element -> element.f0.getBytes();
+        SerializationSchema<Tuple2<String, String>> valueSerializationSchema =
+                element -> element.f1.getBytes();
+
+        DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema();
+        DeserializationSchema<String> valueDeserializationSchema =
+                new StringDeserializationSchema();
+
+        try (final UpsertTestSinkWriter<Tuple2<String, String>> writer =
+                new UpsertTestSinkWriter<>(
+                        outputFile, keySerializationSchema, valueSerializationSchema)) {
+            for (int i = 0; i < 10; i++) {
+                writer.write(Tuple2.of("Key #" + i, "Value #" + i), null);
+            }
+            writer.flush(false);
+
+            Map<String, String> resultMap =
+                    BinaryFileUtil.readRecords(
+                            outputFile, keyDeserializationSchema, valueDeserializationSchema);
+
+            // after flushing the file should contain records
+            for (int i = 0; i < 10; i++) {
+                assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i);
+            }
+
+            for (int i = 0; i < 5; i++) {
+                writer.write(Tuple2.of("Key #" + i, "Value #" + i * 10), null);
+            }
+        }
+
+        Map<String, String> resultMap =
+                BinaryFileUtil.readRecords(
+                        outputFile, keyDeserializationSchema, valueDeserializationSchema);
+
+        for (int i = 0; i < 10; i++) {
+            if (i < 5) {
+                assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i * 10);
+            } else {
+                assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i);
+            }
+        }
+    }
+
+    private static class StringDeserializationSchema extends AbstractDeserializationSchema<String> {

Review Comment:
   Nit: Why not use `SimpleStringSchema`?



##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSinkITCase.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.table;
+
+import org.apache.flink.connector.upserttest.sink.BinaryFileUtil;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Tests for {@link UpsertTestDynamicTableSink}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestDynamicTableSinkITCase {

Review Comment:
   Please use the `MiniClusterExtension` to prevent weird submission bugs. I also wonder why the arch Unit tests pass. I guess they are missing in this module completely ;)



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {

Review Comment:
   I think this should be a `SimpleVersionedSerializer` since that is the main entry for these kinds of operations in the Flink code base. For reading/writing many records you can take a look at https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java



##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSink.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.io.File;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link UpsertTestSink} from a logical
+ * description.
+ */
+@Internal
+class UpsertTestDynamicTableSink implements DynamicTableSink {
+
+    private final DataType physicalRowDataType;
+    private final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
+    private final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat;
+    private final String outputFilePath;
+
+    UpsertTestDynamicTableSink(
+            DataType physicalRowDataType,
+            EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            String outputFilePath) {
+        this.physicalRowDataType = checkNotNull(physicalRowDataType);
+        this.keyEncodingFormat = checkNotNull(keyEncodingFormat);
+        this.valueEncodingFormat = checkNotNull(valueEncodingFormat);
+        this.outputFilePath = checkNotNull(outputFilePath);
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final File outputFile = new File(outputFilePath);
+        final SerializationSchema<RowData> keySerialization =
+                keyEncodingFormat.createRuntimeEncoder(context, physicalRowDataType);
+        final SerializationSchema<RowData> valueSerialization =
+                valueEncodingFormat.createRuntimeEncoder(context, physicalRowDataType);
+
+        final UpsertTestSink<RowData> sink =
+                UpsertTestSink.<RowData>builder()
+                        .setOutputFile(outputFile)
+                        .setKeySerializationSchema(keySerialization)
+                        .setValueSerializationSchema(valueSerialization)
+                        .build();
+        return SinkV2Provider.of(sink, 1);
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new UpsertTestDynamicTableSink(
+                physicalRowDataType, keyEncodingFormat, valueEncodingFormat, outputFilePath);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        UpsertTestDynamicTableSink that = (UpsertTestDynamicTableSink) o;
+        return Objects.equals(physicalRowDataType, that.physicalRowDataType)
+                && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+                && Objects.equals(valueEncodingFormat, that.valueEncodingFormat)
+                && Objects.equals(outputFilePath, that.outputFilePath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalRowDataType, keyEncodingFormat, valueEncodingFormat, outputFilePath);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "UpsertFileSink";

Review Comment:
   Nit: 
   ```suggestion
           return "UpsertTestFileSink";
   ```



##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+    @Test
+    public void testWrite(@TempDir File tempDir) throws Exception {
+        File outputFile = new File(tempDir, "records.out");
+        SerializationSchema<Tuple2<String, String>> keySerializationSchema =
+                element -> element.f0.getBytes();
+        SerializationSchema<Tuple2<String, String>> valueSerializationSchema =
+                element -> element.f1.getBytes();
+
+        try (final UpsertTestSinkWriter<Tuple2<String, String>> writer =
+                new UpsertTestSinkWriter<>(
+                        outputFile, keySerializationSchema, valueSerializationSchema)) {
+            for (int i = 0; i < 10; i++) {
+                writer.write(Tuple2.of("Key #" + i, "Value #" + i), null);
+            }
+        }
+
+        DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema();
+        DeserializationSchema<String> valueDeserializationSchema =
+                new StringDeserializationSchema();
+
+        Map<String, String> resultMap =
+                BinaryFileUtil.readRecords(
+                        outputFile, keyDeserializationSchema, valueDeserializationSchema);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i);
+        }
+    }
+
+    @Test
+    public void testWriteOnCheckpoint(@TempDir File tempDir) throws Exception {

Review Comment:
   Can you extract the common code for both tests? In the end, the setup and assertion are equal the only difference is in one scenario you verify that on closing all records are flushed and in the other, you are verifying the behavior on flush. It might also make sense to only have one parameterized test based on the operation of the `UpsertTestSinkWriter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r878166302


##########
flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java:
##########
@@ -46,6 +62,24 @@ public void testTypeInfoTestCoverage() {
                         .map(Class::getName)
                         .collect(Collectors.toSet());
 
+        //  type info whitelist for TypeInformationTestBase test coverage

Review Comment:
   Nit: Can you also link the corresponding ticket number in the comment?



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestFileUtil.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for reading and writing files from the {@link UpsertTestSink}. */
+@Internal
+public class UpsertTestFileUtil {
+    static final byte MAGIC_BYTE = 13;
+
+    private UpsertTestFileUtil() {}
+
+    static void writeRecords(

Review Comment:
   Nit: I'd make this method public as well to be consistent with the other methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877946191


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapper.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is a slim wrapper around {@code byte[]} that defensively copies the array during
+ * construction and retrieval to prevent its contents from changing. It also implements a
+ * content-dependent {@link #hashCode} method to allow usage as keys in a {@link Map}.
+ */
+class ImmutableByteArrayWrapper {
+
+    @VisibleForTesting final byte[] bytes;
+
+    ImmutableByteArrayWrapper(byte[] bytes) {
+        checkNotNull(bytes);
+        this.bytes = bytes.clone();
+    }
+
+    /**
+     * Returns a reference-free copy of the underlying byte[].
+     *
+     * @return the copied byte[]
+     */
+    byte[] array() {
+        return bytes.clone();

Review Comment:
   I think this depends on the usage, right now it would prevent that changes to the returned byte[] modify the content of the Wrapper class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r878106621


##########
flink-test-utils-parent/flink-test-utils/pom.xml:
##########
@@ -35,6 +35,12 @@ under the License.
 
 	<dependencies>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>

Review Comment:
   1. Its actually located in `flink-test-utils` pom
   2. As discussed offline we need it for the table part of the connector (DynamicTableSink, -Factory,...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868894146


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {

Review Comment:
   > I previously wanted to use the SimpleVersionedSerializer and SimpleVersionedSerialization. However implementing this interface adds the same kind of code we already have in place. 
   
   That is exactly the idea. Of course, the interface adds the same logic but it makes understanding the code easier because other Flink developers are used to `SimpleVersionedSerializer`. 
   
   
   > Also the SimpleVersionedSerialization only takes single byte arrays as input so then we would have to write another method around that to allow both key and value as input. And finally it does not provide a simple entrypoint for reading back all elements so I decided against that,
   
   This point I do not fully understand. My idea was to have the following
   
   ```java
   class UpsertFileSinkSerializer implements SimpleVersionedSerializer<Map<ByteBuffer, ByteBuffer>> {
   ....
   }
   ```
   
   and `SimpleVersionedSerialization#readVersionAndDeSerialize` or `SimpleVersionedSerialization#writeVersionAndSerialize`.  WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868940076


##########
flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSinkITCase.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.table;
+
+import org.apache.flink.connector.upserttest.sink.BinaryFileUtil;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Tests for {@link UpsertTestDynamicTableSink}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestDynamicTableSinkITCase {

Review Comment:
   Yes, the ArchUnit tests were still missing, I added them now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868873968


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {
+    private static final byte MAGIC_BYTE = 13;
+
+    /**
+     * Reads records that were written using the {@link BinaryFileUtil#readRecords} method from the
+     * given InputStream.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return Map containing the read ByteBuffer key-value pairs
+     * @throws IOException
+     */
+    public static Map<ByteBuffer, ByteBuffer> readRecords(BufferedInputStream bis)
+            throws IOException {
+        checkNotNull(bis);
+        Map<ByteBuffer, ByteBuffer> records = new HashMap<>();

Review Comment:
   Where do you see this? The one I found is iterates through the contents individual bytes:
   ```
   public int hashCode() {
           int h = 1;
           int p = position();
           for (int i = limit() - 1; i >= p; i--)
               h = 31 * h + (int)get(i);
           return h;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868878917


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {

Review Comment:
   I previously wanted to use the SimpleVersionedSerializer and SimpleVersionedSerialization. However implementing this interface adds the same kind of code we already have in place. Also the SimpleVersionedSerialization only takes single byte arrays as input so then we would have to write another method around that to allow both key and value as input. And finally it does not  provide a simple entrypoint for reading back all elements so I decided against that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877968464


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for writing records into a file in an upsert-fashion. On every
+ * checkpoint each key-value pair currently in the map is written to the file.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
+    static final byte MAGIC_BYTE = 13;
+
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+    private final Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records =
+            new HashMap<>();
+    private final BufferedOutputStream bufferedOutputStream;
+
+    UpsertTestSinkWriter(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        checkNotNull(outputFile);
+        try {
+            this.bufferedOutputStream =
+                    new BufferedOutputStream(new FileOutputStream(outputFile, true));
+        } catch (FileNotFoundException e) {
+            throw new FlinkRuntimeException("Could not find file", e);
+        }
+    }
+
+    @Override
+    public void write(IN element, Context context) {
+        byte[] key = keySerializationSchema.serialize(element);
+        byte[] value = valueSerializationSchema.serialize(element);
+        records.put(new ImmutableByteArrayWrapper(key), new ImmutableByteArrayWrapper(value));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        writeRecords(bufferedOutputStream, records);
+        records.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        flush(true);
+        bufferedOutputStream.close();
+    }
+
+    private static void writeRecords(

Review Comment:
   I prefer having a separate class that also makes testing easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1119401651

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "26254d076023deec64a2f11dd282fec4987d28e6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "26254d076023deec64a2f11dd282fec4987d28e6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 26254d076023deec64a2f11dd282fec4987d28e6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r868869814


##########
flink-connectors/flink-connector-upsert-test/pom.xml:
##########
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-connectors</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-connector-upsert-test</artifactId>
+	<name>Flink : Connectors : Upsert Test</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java-bridge</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-json</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- ArchUit test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-architecture-tests-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>

Review Comment:
   Oops, missed that on copying over some pom details



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r871404156


##########
flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */
+@Internal
+public class BinaryFileUtil {

Review Comment:
   As discussed offline `SimpleVersionedSerializer` seems to be overkill for the task at hand. `TypeInformation` while nice would require us to complicate the sink by introducing an extra topology to retrieve the information. We will keep it as is for now and can later change it if the necessity arises



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877902446


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapper.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is a slim wrapper around {@code byte[]} that defensively copies the array during
+ * construction and retrieval to prevent its contents from changing. It also implements a
+ * content-dependent {@link #hashCode} method to allow usage as keys in a {@link Map}.
+ */
+class ImmutableByteArrayWrapper {
+
+    @VisibleForTesting final byte[] bytes;
+
+    ImmutableByteArrayWrapper(byte[] bytes) {
+        checkNotNull(bytes);
+        this.bytes = bytes.clone();
+    }
+
+    /**
+     * Returns a reference-free copy of the underlying byte[].
+     *
+     * @return the copied byte[]
+     */
+    byte[] array() {
+        return bytes.clone();

Review Comment:
   Nit: Is it necessary to `copy` again? 



##########
flink-test-utils-parent/flink-test-utils/pom.xml:
##########
@@ -35,6 +35,12 @@ under the License.
 
 	<dependencies>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>

Review Comment:
   I have two questions about this dependency.
   
   1. Why is it defined in the parent pom of the test-utils?
   2. I guess we need it to build the sink but can you quickly check if it is really needed.



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSink.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.flink.connector.upserttest.sink.UpsertTestSinkWriter.MAGIC_BYTE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to upsert test data into a file. This Sink is intended for testing internal
+ * functionality and is **not** production-ready.
+ *
+ * <p>Please note that the UpsertTestSink needs to run with a parallelism of 1 to function
+ * correctly. There is currently no support for using multiple writers at once.
+ *
+ * @param <IN> type of records written to the file
+ * @see UpsertTestSinkBuilder on how to construct an UpsertTestSink
+ */
+@PublicEvolving
+public class UpsertTestSink<IN> implements Sink<IN> {
+
+    private final File outputFile;
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+
+    UpsertTestSink(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.outputFile = checkNotNull(outputFile);
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+    }
+
+    /**
+     * Create a {@link UpsertTestSinkBuilder} to construct a new {@link UpsertTestSink}.
+     *
+     * @param <IN> type of incoming records
+     * @return {@link UpsertTestSinkBuilder}
+     */
+    public static <IN> UpsertTestSinkBuilder<IN> builder() {
+        return new UpsertTestSinkBuilder<>();
+    }
+
+    @Internal
+    @Override
+    public SinkWriter<IN> createWriter(InitContext context) {
+        return new UpsertTestSinkWriter<>(
+                outputFile, keySerializationSchema, valueSerializationSchema);
+    }
+
+    /////////////////////////////////////////////////////////////
+    // Utilities
+    /////////////////////////////////////////////////////////////
+
+    /**
+     * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the
+     * given File.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return the number of records
+     * @throws IOException
+     */
+    public static int getNumberOfRecords(BufferedInputStream bis) throws IOException {
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = readRecords(bis);
+        return records.size();
+    }
+
+    /**
+     * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the
+     * given File.
+     *
+     * @param file The File to read from
+     * @return the number of records
+     * @throws IOException
+     */
+    public static int getNumberOfRecords(File file) throws IOException {
+        checkNotNull(file);
+        FileInputStream fs = new FileInputStream(file);
+        BufferedInputStream bis = new BufferedInputStream(fs);
+        return getNumberOfRecords(bis);
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given
+     * InputStream and converts them using the provided {@link DeserializationSchema}s.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @param keyDeserializationSchema The key's DeserializationSchema
+     * @param valueDeserializationSchema The value's DeserializationSchema
+     * @return Map containing the deserialized key-value pairs
+     * @throws IOException
+     */
+    public static <K, V> Map<K, V> readRecords(
+            BufferedInputStream bis,
+            DeserializationSchema<K> keyDeserializationSchema,
+            DeserializationSchema<V> valueDeserializationSchema)
+            throws IOException {
+        checkNotNull(bis);
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> bytesMap = readRecords(bis);
+        Map<K, V> typedMap = new HashMap<>(bytesMap.size());
+
+        Iterator<Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper>> it =
+                bytesMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> entry = it.next();
+            K key = keyDeserializationSchema.deserialize(entry.getKey().array());
+            V value = valueDeserializationSchema.deserialize(entry.getValue().array());
+            typedMap.put(key, value);
+        }
+        return typedMap;
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given File
+     * and converts them using the provided {@link DeserializationSchema}s.
+     *
+     * @param file The File to read from
+     * @param keyDeserializationSchema The key's DeserializationSchema
+     * @param valueDeserializationSchema The value's DeserializationSchema
+     * @return Map containing the deserialized key-value pairs
+     * @throws IOException
+     */
+    public static <K, V> Map<K, V> readRecords(
+            File file,
+            DeserializationSchema<K> keyDeserializationSchema,
+            DeserializationSchema<V> valueDeserializationSchema)
+            throws IOException {
+        checkNotNull(file);
+        FileInputStream fs = new FileInputStream(file);
+        BufferedInputStream bis = new BufferedInputStream(fs);
+        return readRecords(bis, keyDeserializationSchema, valueDeserializationSchema);
+    }
+
+    /**
+     * Reads records that were written using the {@link UpsertTestSinkWriter} from the given
+     * InputStream.
+     *
+     * @param bis The BufferedInputStream to read from
+     * @return Map containing the read ImmutableByteArrayWrapper key-value pairs
+     * @throws IOException
+     */
+    private static Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> readRecords(
+            BufferedInputStream bis) throws IOException {
+        checkNotNull(bis);
+        Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = new HashMap<>();
+        int magicByte;
+        while ((magicByte = bis.read()) != -1) {

Review Comment:
   Can you explain the `while` loop here? I would have thought just reading the magic bytes followed by the content is enough.



##########
flink-tests/src/test/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapperTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link ImmutableByteArrayWrapper}. */
+@ExtendWith(TestLoggerExtension.class)
+class ImmutableByteArrayWrapperTest {
+
+    @Test
+    public void testConstructorCopy() {
+        byte[] array = "immutability of constructor".getBytes();
+        byte[] clonedArray = new ImmutableByteArrayWrapper(array).bytes;
+        assertCopyIsReferenceFree(array, clonedArray);
+    }
+
+    @Test
+    public void testGetterCopy() {
+        byte[] array = "immutability of getter".getBytes();
+        byte[] clonedArray = new ImmutableByteArrayWrapper(array).array();
+        assertCopyIsReferenceFree(array, clonedArray);
+    }
+
+    void assertCopyIsReferenceFree(byte[] original, byte[] clone) {

Review Comment:
   private static?



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for writing records into a file in an upsert-fashion. On every
+ * checkpoint each key-value pair currently in the map is written to the file.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
+    static final byte MAGIC_BYTE = 13;
+
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+    private final Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records =
+            new HashMap<>();
+    private final BufferedOutputStream bufferedOutputStream;
+
+    UpsertTestSinkWriter(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        checkNotNull(outputFile);
+        try {
+            this.bufferedOutputStream =
+                    new BufferedOutputStream(new FileOutputStream(outputFile, true));
+        } catch (FileNotFoundException e) {
+            throw new FlinkRuntimeException("Could not find file", e);
+        }
+    }
+
+    @Override
+    public void write(IN element, Context context) {
+        byte[] key = keySerializationSchema.serialize(element);
+        byte[] value = valueSerializationSchema.serialize(element);
+        records.put(new ImmutableByteArrayWrapper(key), new ImmutableByteArrayWrapper(value));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        writeRecords(bufferedOutputStream, records);
+        records.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        flush(true);
+        bufferedOutputStream.close();
+    }
+
+    private static void writeRecords(

Review Comment:
   Nit: I am not sure if I like the current approach where the writing and reading code is not part of the same class anymore. Sharing the `MAGIC_BYTE` seems already hard to track.



##########
flink-tests/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UpsertTestSinkWriter}. */
+@ExtendWith(TestLoggerExtension.class)
+class UpsertTestSinkWriterITCase {
+
+    @RegisterExtension
+    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    @TempDir private File tempDir;
+    private File outputFile;
+    private UpsertTestSinkWriter<Tuple2<String, String>> writer;
+    private List<Tuple2<String, String>> expectedRecords;
+
+    @BeforeEach
+    public void setup() {
+        outputFile = new File(tempDir, "records.out");
+        writer = createSinkWriter(outputFile);
+        expectedRecords = writeTestData(writer);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        writer.close();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        writer.close();
+        testRecordPresence(outputFile, expectedRecords);
+    }
+
+    @Test
+    public void testWriteOnCheckpoint() throws Exception {
+        writer.flush(false);
+        testRecordPresence(outputFile, expectedRecords);
+    }
+
+    UpsertTestSinkWriter<Tuple2<String, String>> createSinkWriter(File outputFile) {
+        SerializationSchema<Tuple2<String, String>> keySerializationSchema =
+                element -> element.f0.getBytes();
+        SerializationSchema<Tuple2<String, String>> valueSerializationSchema =
+                element -> element.f1.getBytes();
+
+        return new UpsertTestSinkWriter<>(
+                outputFile, keySerializationSchema, valueSerializationSchema);
+    }
+
+    List<Tuple2<String, String>> writeTestData(
+            UpsertTestSinkWriter<Tuple2<String, String>> writer) {
+        final List<Tuple2<String, String>> expectedRecords = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            Tuple2<String, String> record = Tuple2.of("Key #" + i, "Value #" + i);
+            expectedRecords.add(record);
+            writer.write(record, null);
+        }
+        return expectedRecords;
+    }
+
+    void testRecordPresence(File outputFile, List<Tuple2<String, String>> expectedRecords)

Review Comment:
   private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877950300


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for writing records into a file in an upsert-fashion. On every
+ * checkpoint each key-value pair currently in the map is written to the file.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
+    static final byte MAGIC_BYTE = 13;
+
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+    private final Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records =
+            new HashMap<>();
+    private final BufferedOutputStream bufferedOutputStream;
+
+    UpsertTestSinkWriter(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        checkNotNull(outputFile);
+        try {
+            this.bufferedOutputStream =
+                    new BufferedOutputStream(new FileOutputStream(outputFile, true));
+        } catch (FileNotFoundException e) {
+            throw new FlinkRuntimeException("Could not find file", e);
+        }
+    }
+
+    @Override
+    public void write(IN element, Context context) {
+        byte[] key = keySerializationSchema.serialize(element);
+        byte[] value = valueSerializationSchema.serialize(element);
+        records.put(new ImmutableByteArrayWrapper(key), new ImmutableByteArrayWrapper(value));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        writeRecords(bufferedOutputStream, records);
+        records.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        flush(true);
+        bufferedOutputStream.close();
+    }
+
+    private static void writeRecords(

Review Comment:
   I agree, Arvid preferred to have it as a public part of the Sink but this introduces the separation. I think the question then becomes do we make the Writer public and have the reading in there or do we reintroduce a util class for reading and writing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877949281


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapper.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is a slim wrapper around {@code byte[]} that defensively copies the array during
+ * construction and retrieval to prevent its contents from changing. It also implements a
+ * content-dependent {@link #hashCode} method to allow usage as keys in a {@link Map}.
+ */
+class ImmutableByteArrayWrapper {
+
+    @VisibleForTesting final byte[] bytes;
+
+    ImmutableByteArrayWrapper(byte[] bytes) {
+        checkNotNull(bytes);
+        this.bytes = bytes.clone();
+    }
+
+    /**
+     * Returns a reference-free copy of the underlying byte[].
+     *
+     * @return the copied byte[]
+     */
+    byte[] array() {
+        return bytes.clone();

Review Comment:
   Good point, makes sense 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1120713848

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] alpreu commented on pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
alpreu commented on PR #19655:
URL: https://github.com/apache/flink/pull/19655#issuecomment-1119538247

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19655: [FLINK-27527] Create a file-based upsert sink for testing internal components

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19655:
URL: https://github.com/apache/flink/pull/19655#discussion_r877968464


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.upserttest.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for writing records into a file in an upsert-fashion. On every
+ * checkpoint each key-value pair currently in the map is written to the file.
+ *
+ * @param <IN> The type of the input elements.
+ */
+class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
+    static final byte MAGIC_BYTE = 13;
+
+    private final SerializationSchema<IN> keySerializationSchema;
+    private final SerializationSchema<IN> valueSerializationSchema;
+    private final Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records =
+            new HashMap<>();
+    private final BufferedOutputStream bufferedOutputStream;
+
+    UpsertTestSinkWriter(
+            File outputFile,
+            SerializationSchema<IN> keySerializationSchema,
+            SerializationSchema<IN> valueSerializationSchema) {
+        this.keySerializationSchema = checkNotNull(keySerializationSchema);
+        this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
+        checkNotNull(outputFile);
+        try {
+            this.bufferedOutputStream =
+                    new BufferedOutputStream(new FileOutputStream(outputFile, true));
+        } catch (FileNotFoundException e) {
+            throw new FlinkRuntimeException("Could not find file", e);
+        }
+    }
+
+    @Override
+    public void write(IN element, Context context) {
+        byte[] key = keySerializationSchema.serialize(element);
+        byte[] value = valueSerializationSchema.serialize(element);
+        records.put(new ImmutableByteArrayWrapper(key), new ImmutableByteArrayWrapper(value));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException {
+        writeRecords(bufferedOutputStream, records);
+        records.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        flush(true);
+        bufferedOutputStream.close();
+    }
+
+    private static void writeRecords(

Review Comment:
   I prefer having a separate class that also makes testing easier but up to you its only a minor thing which we can change later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org