You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/23 18:36:35 UTC

[GitHub] [flink-connector-rabbitmq] pscls opened a new pull request, #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

pscls opened a new pull request, #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that by following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled-out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This pull request ports the RabbitMQ connector implementation to the new Connector’s API described in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) and [FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API). It includes both source and sink with at-most-once, at-least-once, and exactly-once behavior, respectively.
   
   This pull request closes the following issues (separated RabbitMQ connector Source and Sink tickets): [FLINK-20628](https://issues.apache.org/jira/browse/FLINK-20628) and [FLINK-21373](https://issues.apache.org/jira/browse/FLINK-21373)
   
   
   ## Brief change log
   
   - Source and Sink use the RabbitMQ’s Java Client API to interact with RabbitMQ
   - The RabbitMQ Source reads messages from a queue
   - At-least-once
     - Messages are acknowledged on checkpoint completion
   - Exactly-once 
     - Messages are acknowledged in a transaction
   - The user has to set correlation ids for deduplication
   - The RabbitMQ Sink publishes messages to a queue
   - At-least-once
     - Unacknowledged messages are resend on checkpoints
   - Exactly-once
     - Messages between two checkpoints are published in a transaction
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   All changes are within the flink-connectors/flink-connector-rabbitmq2/ module.
   Added Integration Tests can be find under org.apache.flink.connector.rabbitmq2.source and org.apache.flink.connector.rabbitmq2.sink package in the test respective directories.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (don't know)
     - The runtime per-record code paths (performance sensitive): (don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (don't know)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduces a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   
   Co-authored-by: Yannik Schroeder <sc...@web.de>
   Co-authored-by: Jan Westphal <ja...@gmail.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1135867243

   @pscls I think you've done a good job already with the root-pom; it looks like the one we currently have for Elasticsearch. I've just approved the run, so we can also see how the build behaves. When I tried it locally, it complained about https://github.com/pscls/flink-connector-rabbitmq/blob/new-api-connector/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java#L124 having a whitespace, but now the tests are running for me. 
   
   I'll work on finding someone who can help with the review for this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1160080270

   @pscls Can you have a look at the failing build? It's a checkstyle error. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] pscls commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
pscls commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1135015425

   @MartijnVisser We are not exactly sure what has to be part of the root-pom. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] pscls commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
pscls commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1135011871

   This is a copy from the original PR (https://github.com/apache/flink/pull/15140) against the Flink repository. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] zentol commented on a diff in pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#discussion_r983361068


##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/README.md:
##########
@@ -0,0 +1,14 @@
+# License of the RabbitMQ Connector
+
+Flink's RabbitMQ connector defines a Maven dependency on the
+"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
+the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
+
+Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
+nor packages binaries from the "RabbitMQ AMQP Java Client".
+
+Users that create and publish derivative work based on Flink's
+RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
+must be aware that this may be subject to conditions declared in the
+Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
+and the Apache License version 2 ("ASL").

Review Comment:
   This belongs into a NOTICE, both contained in the source release and jars.
   
   ```suggestion
   Users that create and publish derivative work based on Flink's
   RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
   must be aware that this is subject to conditions declared in either the
   Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
   OR the Apache License version 2 ("ASL").
   ```
   
   Additionally this needs clarification that users can _choose_ which license they use for the derivative work.



##########
pom.xml:
##########
@@ -0,0 +1,1371 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+	<parent>
+		<groupId>org.apache</groupId>
+		<artifactId>apache</artifactId>
+		<version>20</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connectors</artifactId>

Review Comment:
   This file is missing a fair amount of cleanups that were later applied to the ES parent pom.
   
   See https://github.com/apache/flink-connector-elasticsearch/pull/31 and replicate the changes.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least once.
+ *
+ * <p>At-least-once consistency is implemented by assigning sequence numbers to arriving messages
+ * and buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are resend only when a
+ * checkpoint is triggered (to avoid complex time tracking mechanisms for each individual message).
+ * Thus on each checkpoint, all messages which were sent at least once before to RabbitMQ but are
+ * still unacknowledged will be send once again - duplications are possible by this behavior.
+ *
+ * <p>After a failure, a new writer gets initialized with one or more states that contain
+ * unacknowledged messages. These messages get resend immediately while buffering them in the new
+ * state of the writer.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public class RabbitMQSinkWriterAtLeastOnce<T> extends RabbitMQSinkWriterBase<T> {
+    protected final ConcurrentNavigableMap<Long, RabbitMQSinkMessageWrapper<T>> outstandingConfirms;
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkWriterAtLeastOnce.class);
+
+    private Set<Long> lastSeenMessageIds;
+
+    /**
+     * Create a new RabbitMQSinkWriterAtLeastOnce.
+     *
+     * @param connectionConfig configuration parameters used to connect to RabbitMQ
+     * @param queueName name of the queue to publish to
+     * @param serializationSchema serialization schema to turn elements into byte representation
+     * @param publishOptions optionally used to compute routing/exchange for messages
+     * @param returnListener returnListener
+     */
+    public RabbitMQSinkWriterAtLeastOnce(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            SerializationSchema<T> serializationSchema,
+            RabbitMQSinkPublishOptions<T> publishOptions,
+            SerializableReturnListener returnListener) {
+        super(connectionConfig, queueName, serializationSchema, publishOptions, returnListener);
+        this.outstandingConfirms = new ConcurrentSkipListMap<>();
+        this.lastSeenMessageIds = new HashSet<>();
+    }
+
+    /**
+     * On recover all stored messages in the states get resend.
+     *
+     * @param states a list of states to recover the reader with
+     * @throws IOException as messages are send to RabbitMQ
+     */
+    @Override
+    public void recoverFromStates(List<RabbitMQSinkWriterState<T>> states) throws IOException {
+        for (RabbitMQSinkWriterState<T> state : states) {
+            for (RabbitMQSinkMessageWrapper<T> message : state.getOutstandingMessages()) {
+                send(message);
+            }
+        }
+    }
+
+    private void send(RabbitMQSinkMessageWrapper<T> msg) throws IOException {
+        long sequenceNumber = getRmqChannel().getNextPublishSeqNo();
+        getRmqSinkConnection().send(msg);
+        outstandingConfirms.put(sequenceNumber, msg);

Review Comment:
   Is there any back-pressure mechanism to prevent this from growing infinitely if the input rate exceeds the output rate?



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, RabbitMQSourceSplit> {
+    protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+    // The assigned split from the enumerator.
+    private RabbitMQSourceSplit split;
+
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    private final SourceReaderContext sourceReaderContext;
+    // The deserialization schema for the messages of RabbitMQ.
+    private final DeserializationSchema<T> deliveryDeserializer;
+    // The collector keeps the messages received from RabbitMQ.
+    private final RabbitMQCollector<T> collector;
+
+    public RabbitMQSourceReaderBase(
+            SourceReaderContext sourceReaderContext,
+            DeserializationSchema<T> deliveryDeserializer) {
+        this.sourceReaderContext = requireNonNull(sourceReaderContext);
+        this.deliveryDeserializer = requireNonNull(deliveryDeserializer);
+        this.collector = new RabbitMQCollector<>();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting source reader and send split request");
+        sourceReaderContext.sendSplitRequest();
+    }
+
+    // ------------- start RabbitMQ methods  --------------
+
+    private void setupRabbitMQ() throws Exception {
+        setupConnection();
+        setupChannel();
+        LOG.info(
+                "RabbitMQ Connection was successful: Waiting for messages from the queue. To exit press CTRL+C");
+    }
+
+    private ConnectionFactory setupConnectionFactory() throws Exception {
+        return split.getConnectionConfig().getConnectionFactory();
+    }
+
+    private void setupConnection() throws Exception {
+        rmqConnection = setupConnectionFactory().newConnection();
+    }
+
+    /** @return boolean whether messages should be automatically acknowledged to RabbitMQ. */
+    protected abstract boolean isAutoAck();
+
+    /**
+     * This function will be called when a new message from RabbitMQ gets pushed to the source. The
+     * message will be deserialized and forwarded to our message collector where it is buffered
+     * until it can be processed.
+     *
+     * @param consumerTag The consumer tag of the message.
+     * @param delivery The delivery from RabbitMQ.
+     * @throws IOException if something fails during deserialization.
+     */
+    protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery)
+            throws IOException {
+
+        AMQP.BasicProperties properties = delivery.getProperties();
+        byte[] body = delivery.getBody();
+        Envelope envelope = delivery.getEnvelope();
+        collector.setMessageIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag());
+        deliveryDeserializer.deserialize(body, collector);
+    }
+
+    protected void setupChannel() throws IOException {
+        rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(split.getQueueName(), true, false, false, null);
+
+        // Set maximum of unacknowledged messages
+        if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+            // global: false - the prefetch count is set per consumer, not per RabbitMQ channel
+            rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(), false);
+        }
+
+        final DeliverCallback deliverCallback = this::handleMessageReceivedCallback;
+        rmqChannel.basicConsume(
+                split.getQueueName(), isAutoAck(), deliverCallback, consumerTag -> {});
+    }
+
+    // ------------- end RabbitMQ methods  --------------
+
+    /**
+     * This method provides a hook that is called when a message gets polled by the output.
+     *
+     * @param message the message that was polled by the output.
+     */
+    protected void handleMessagePolled(RabbitMQSourceMessageWrapper<T> message) {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) {
+        RabbitMQSourceMessageWrapper<T> message = collector.pollMessage();
+        if (message == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        output.collect(message.getMessage());
+        handleMessagePolled(message);
+
+        return collector.hasUnpolledMessages()
+                ? InputStatus.MORE_AVAILABLE
+                : InputStatus.NOTHING_AVAILABLE;
+    }
+
+    @Override
+    public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+        return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>();
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return CompletableFuture.runAsync(
+                () -> {
+                    while (!collector.hasUnpolledMessages()) {
+                        // supposed to be empty
+                    }
+                });

Review Comment:
   This is problematic for a few reasons.
   
   a) It runs in a non-specified thread pool, which means it runs in the JVMs common pool which may also be in use by other components. Use a dedicated executor.
   b) It hot-loops, which both blocks an entire thread from doing anything and blows through CPU cycles. Consider restructuring the collector to return a sort of availability future that is completed once a message was added, or use basic locking to at least prevent hot-looping.



##########
flink-connector-rabbitmq/pom.xml:
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.16-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-connector-rabbitmq</artifactId>
+	<name>Flink : Connectors : RabbitMQ</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<rabbitmq.version>5.9.0</rabbitmq.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- Core -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- RabbitMQ -->
+
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+			<version>${rabbitmq.version}</version>
+		</dependency>
+
+		<!-- Tests -->
+
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>rabbitmq</artifactId>
+			<version>1.15.1</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${flink.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${flink.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>

Review Comment:
   Why are we publishing a test-jar?



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.common;
+
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A wrapper class for messages that need to be persisted in the state of a {@link
+ * RabbitMQSinkWriterAtLeastOnce} or {@link RabbitMQSinkWriterExactlyOnce}.
+ *
+ * <p>It holds the message in its serialized format which gets sent to RabbitMQ. In the case of
+ * publish options being present and checkpointing modes of at-least-once or exactly-once the
+ * original message needs to be stored as well because it is needed for recomputing the
+ * exchange/routing key from the message content.
+ */
+public class RabbitMQSinkMessageWrapper<T> {
+    private T message;
+    private final byte[] bytes;
+
+    public RabbitMQSinkMessageWrapper(byte[] bytes) {
+        this.bytes = requireNonNull(bytes);
+    }
+
+    public RabbitMQSinkMessageWrapper(T message, byte[] bytes) {
+        this(bytes);
+        this.message = requireNonNull(message);
+    }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    public T getMessage() {

Review Comment:
   `@Nullable`



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.connector.rabbitmq.source.RabbitMQSource;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If
+ * {@link Builder#setUri(String)} has been set then {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, Boolean, Boolean, Integer,
+ * Integer, Integer, Integer, Integer)} will be used to initialize the RMQ connection or {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, String, String, String,
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)} will be used to
+ * initialize the RMQ connection.
+ */
+public class RabbitMQConnectionConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConnectionConfig.class);
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+
+    private Integer prefetchCount;
+
+    /**
+     * @param host host name
+     * @param port port
+     * @param virtualHost virtual host
+     * @param username username
+     * @param password password
+     * @param networkRecoveryInterval connection recovery interval in milliseconds
+     * @param automaticRecovery if automatic connection recovery
+     * @param topologyRecovery if topology recovery
+     * @param connectionTimeout connection timeout
+     * @param requestedChannelMax requested maximum channel number
+     * @param requestedFrameMax requested maximum frame size
+     * @param requestedHeartbeat requested heartbeat interval
+     * @throws NullPointerException if host or virtual host or username or password is null
+     */
+    private RabbitMQConnectionConfig(
+            String host,
+            Integer port,
+            String virtualHost,
+            String username,
+            String password,
+            Integer networkRecoveryInterval,
+            Boolean automaticRecovery,
+            Boolean topologyRecovery,
+            Integer connectionTimeout,
+            Integer requestedChannelMax,
+            Integer requestedFrameMax,
+            Integer requestedHeartbeat,
+            Integer prefetchCount) {
+        this.host = requireNonNull(host);
+        this.port = requireNonNull(port);
+        this.virtualHost = requireNonNull(virtualHost);
+        this.username = requireNonNull(username);
+        this.password = requireNonNull(password);
+
+        this.networkRecoveryInterval = networkRecoveryInterval;
+        this.automaticRecovery = automaticRecovery;
+        this.topologyRecovery = topologyRecovery;
+        this.connectionTimeout = connectionTimeout;
+        this.requestedChannelMax = requestedChannelMax;
+        this.requestedFrameMax = requestedFrameMax;
+        this.requestedHeartbeat = requestedHeartbeat;
+        this.prefetchCount = prefetchCount;
+    }
+
+    /**
+     * @param uri the connection URI
+     * @param networkRecoveryInterval connection recovery interval in milliseconds
+     * @param automaticRecovery if automatic connection recovery
+     * @param topologyRecovery if topology recovery
+     * @param connectionTimeout connection timeout
+     * @param requestedChannelMax requested maximum channel number
+     * @param requestedFrameMax requested maximum frame size
+     * @param requestedHeartbeat requested heartbeat interval
+     * @throws NullPointerException if URI is null
+     */
+    private RabbitMQConnectionConfig(
+            String uri,
+            Integer networkRecoveryInterval,
+            Boolean automaticRecovery,
+            Boolean topologyRecovery,
+            Integer connectionTimeout,
+            Integer requestedChannelMax,
+            Integer requestedFrameMax,
+            Integer requestedHeartbeat,
+            Integer prefetchCount) {
+        Preconditions.checkNotNull(uri, "Uri can not be null");
+        this.uri = uri;
+
+        this.networkRecoveryInterval = networkRecoveryInterval;
+        this.automaticRecovery = automaticRecovery;
+        this.topologyRecovery = topologyRecovery;
+        this.connectionTimeout = connectionTimeout;
+        this.requestedChannelMax = requestedChannelMax;
+        this.requestedFrameMax = requestedFrameMax;
+        this.requestedHeartbeat = requestedHeartbeat;
+        this.prefetchCount = prefetchCount;
+    }
+
+    /** @return the host to use for connections */
+    public String getHost() {
+        return host;
+    }
+
+    /** @return the port to use for connections */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Retrieve the virtual host.
+     *
+     * @return the virtual host to use when connecting to the broker
+     */
+    public String getVirtualHost() {
+        return virtualHost;
+    }
+
+    /**
+     * Retrieve the user name.
+     *
+     * @return the AMQP user name to use when connecting to the broker
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * Retrieve the password.
+     *
+     * @return the password to use when connecting to the broker
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Retrieve the URI.
+     *
+     * @return the connection URI when connecting to the broker
+     */
+    public String getUri() {
+        return uri;
+    }
+
+    /**
+     * Returns automatic connection recovery interval in milliseconds.
+     *
+     * @return how long will automatic recovery wait before attempting to reconnect, in ms; default
+     *     is 5000
+     */
+    public Integer getNetworkRecoveryInterval() {
+        return networkRecoveryInterval;
+    }
+
+    /**
+     * Returns true if automatic connection recovery is enabled, false otherwise.
+     *
+     * @return true if automatic connection recovery is enabled, false otherwise
+     */
+    public Boolean isAutomaticRecovery() {
+        return automaticRecovery;
+    }
+
+    /**
+     * Returns true if topology recovery is enabled, false otherwise.
+     *
+     * @return true if topology recovery is enabled, false otherwise
+     */
+    public Boolean isTopologyRecovery() {
+        return topologyRecovery;
+    }
+
+    /**
+     * Retrieve the connection timeout.
+     *
+     * @return the connection timeout, in milliseconds; zero for infinite
+     */
+    public Integer getConnectionTimeout() {

Review Comment:
   Annotate with `@Nullable` where required



##########
flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializerTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.split;
+
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+
+import org.junit.Test;

Review Comment:
   It would be good if we could start out with junit5 and not have to go through a migration again.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkMessageWrapper.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.common;
+
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A wrapper class for messages that need to be persisted in the state of a {@link
+ * RabbitMQSinkWriterAtLeastOnce} or {@link RabbitMQSinkWriterExactlyOnce}.
+ *
+ * <p>It holds the message in its serialized format which gets sent to RabbitMQ. In the case of
+ * publish options being present and checkpointing modes of at-least-once or exactly-once the
+ * original message needs to be stored as well because it is needed for recomputing the
+ * exchange/routing key from the message content.
+ */
+public class RabbitMQSinkMessageWrapper<T> {
+    private T message;

Review Comment:
   final



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.connector.rabbitmq.source.RabbitMQSource;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If
+ * {@link Builder#setUri(String)} has been set then {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, Boolean, Boolean, Integer,
+ * Integer, Integer, Integer, Integer)} will be used to initialize the RMQ connection or {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, String, String, String,
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)} will be used to
+ * initialize the RMQ connection.
+ */
+public class RabbitMQConnectionConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConnectionConfig.class);
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+
+    private Integer prefetchCount;

Review Comment:
   these should all be `final`.
   



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/state/RabbitMQSinkWriterStateSerializer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.state;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for a {@link RabbitMQSinkWriterState} used for at-least and exactly-once consistency
+ * of the sink.
+ */
+public class RabbitMQSinkWriterStateSerializer<T>
+        implements SimpleVersionedSerializer<RabbitMQSinkWriterState<T>> {
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public RabbitMQSinkWriterStateSerializer(
+            @Nullable DeserializationSchema<T> deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    public RabbitMQSinkWriterStateSerializer() {
+        this(null);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    /**
+     * Serializes all {@code outstandingMessages} of a state of a single sink writer.
+     *
+     * @param rabbitMQSinkWriterState A state containing a list of {@code outstandingMessages}
+     * @throws IOException If output stream cant write the required data
+     */
+    @Override
+    public byte[] serialize(RabbitMQSinkWriterState<T> rabbitMQSinkWriterState) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+        serializeV1(out, rabbitMQSinkWriterState.getOutstandingMessages());
+        return baos.toByteArray();
+    }
+
+    private void serializeV1(DataOutputStream out, List<RabbitMQSinkMessageWrapper<T>> messages)
+            throws IOException {
+        out.writeInt(messages.size());
+        for (RabbitMQSinkMessageWrapper<T> message : messages) {
+            out.writeInt(message.getBytes().length);
+            out.write(message.getBytes());
+        }
+        out.flush();
+    }
+
+    /**
+     * Deserializes {@link RabbitMQSinkMessageWrapper} objects that wrap the byte representation of
+     * a message that needs to be delivered to RabbitMQ as well as the original object
+     * representation if a deserialization schema is provided.
+     *
+     * @param version which deserialization version should be used
+     * @param bytes Serialized outstanding sink messages
+     * @return A list of messages that need to be redelivered to RabbitMQ
+     * @throws IOException If input stream cant read the required data
+     */
+    @Override
+    public RabbitMQSinkWriterState<T> deserialize(int version, byte[] bytes) throws IOException {
+        switch (version) {
+            case 1:
+                return deserializeV1(bytes);
+            default:
+                throw new IOException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private RabbitMQSinkWriterState<T> deserializeV1(byte[] bytes) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        DataInputStream in = new DataInputStream(bais);
+        return new RabbitMQSinkWriterState<>(readSinkMessages(in));
+    }
+
+    private List<RabbitMQSinkMessageWrapper<T>> readSinkMessages(DataInputStream in)
+            throws IOException {
+        final int numberOfMessages = in.readInt();
+        List<RabbitMQSinkMessageWrapper<T>> messages = new ArrayList<>();
+        for (int i = 0; i < numberOfMessages; i++) {
+            byte[] bytes = new byte[in.readInt()];
+            in.read(bytes);

Review Comment:
   This is incorrect.
   
   > Reads **some** number of bytes from the contained input stream and stores them into the buffer array b.
   
   use IOUtils#readFully



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java:
##########
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.connector.rabbitmq.source.RabbitMQSource;
+import org.apache.flink.util.Preconditions;
+
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If
+ * {@link Builder#setUri(String)} has been set then {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, Boolean, Boolean, Integer,
+ * Integer, Integer, Integer, Integer)} will be used to initialize the RMQ connection or {@link
+ * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, String, String, String,
+ * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)} will be used to
+ * initialize the RMQ connection.
+ */
+public class RabbitMQConnectionConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConnectionConfig.class);
+
+    private String host;
+    private Integer port;
+    private String virtualHost;
+    private String username;
+    private String password;
+    private String uri;
+
+    private Integer networkRecoveryInterval;
+    private Boolean automaticRecovery;
+    private Boolean topologyRecovery;
+
+    private Integer connectionTimeout;
+    private Integer requestedChannelMax;
+    private Integer requestedFrameMax;
+    private Integer requestedHeartbeat;
+
+    private Integer prefetchCount;

Review Comment:
   annotate with `@Nullable` where appropriate



##########
flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.connector.rabbitmq.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQBaseTest;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQContainerClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ sink with different consistency modes. As the tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkITCase extends RabbitMQBaseTest {
+
+    private static AtomicBoolean shouldFail;
+
+    @Before
+    public void setup() {
+        shouldFail = new AtomicBoolean(true);
+    }
+
+    private static class GeneratorFailureSource implements SourceFunction<String> {
+
+        private final BlockingQueue<String> messagesToSend;
+        private int failAtNthMessage;
+
+        public GeneratorFailureSource(BlockingQueue<String> messagesToSend, int failAtNthMessage) {
+            this.messagesToSend = messagesToSend;
+            this.failAtNthMessage = failAtNthMessage;
+            shouldFail.set(true);
+        }
+
+        @Override
+        public void run(SourceContext<String> sourceContext) throws Exception {
+            while (true) {
+                if (failAtNthMessage == 0 && shouldFail.get()) {
+                    shouldFail.set(false);
+                    throw new Exception("Supposed to Fail");
+                }
+                failAtNthMessage -= 1;
+                String message = messagesToSend.take();
+                sourceContext.collect(message);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    @Test
+    public void atMostOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient<String> client =
+                addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE, messages.size());
+        executeFlinkJob();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient<String> client =
+                addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, messages.size());
+
+        executeFlinkJob();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceWithFlinkFailureTest() throws Exception {

Review Comment:
   This one fails for me locally.
   
   ```
   org.junit.runners.model.TestTimedOutException: test timed out after 20 seconds
   
   	at java.base@11.0.16.1/jdk.internal.misc.Unsafe.park(Native Method)
   	at java.base@11.0.16.1/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
   	at java.base@11.0.16.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
   	at java.base@11.0.16.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
   	at java.base@11.0.16.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
   	at java.base@11.0.16.1/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
   	at app//org.apache.flink.connector.rabbitmq.common.RabbitMQContainerClient.await(RabbitMQContainerClient.java:116)
   	at app//org.apache.flink.connector.rabbitmq.sink.RabbitMQSinkITCase.atLeastOnceWithFlinkFailureTest(RabbitMQSinkITCase.java:119)
   ```



##########
flink-connector-rabbitmq/README.md:
##########
@@ -0,0 +1,161 @@
+# License of the RabbitMQ Connector
+
+Flink's RabbitMQ connector defines a Maven dependency on the
+"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"),
+the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL").
+
+Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client"
+nor packages binaries from the "RabbitMQ AMQP Java Client".
+
+Users that create and publish derivative work based on Flink's
+RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client")
+must be aware that this may be subject to conditions declared in the
+Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL")
+and the Apache License version 2 ("ASL").
+
+This connector allows consuming messages from and publishing to RabbitMQ. It implements the
+Source API specified in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+and the Sink API specified in [FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API).
+
+For more information about RabbitMQ visit https://www.rabbitmq.com/.
+
+# RabbitMQ Source
+
+Flink's RabbitMQ connector provides a streaming-only source which enables you to receive messages
+from a RabbitMQ queue in three different consistency modes: at-most-once, at-least-once,
+and exactly-once.
+
+## Consistency Modes

Review Comment:
   This all belongs into the documentation, not a readme.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least once.
+ *
+ * <p>At-least-once consistency is implemented by assigning sequence numbers to arriving messages
+ * and buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are resend only when a

Review Comment:
   ```suggestion
    * <p>Checkpointing is required for at-least-once to work because messages are re-sent only when a
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/common/RabbitMQSinkPublishOptions.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+import java.util.Optional;
+
+/**
+ * This class was copied from the old RabbitMQ connector and got extended by the serialization
+ * schema which is required for at-least-once and exactly-once.
+ *
+ * <p>The message computation provides methods to compute the message routing key and/or the
+ * properties.
+ *
+ * @param <IN> The type of the data used by the sink.
+ */
+@PublicEvolving
+public interface RabbitMQSinkPublishOptions<IN> extends java.io.Serializable {

Review Comment:
   ```suggestion
   public interface RabbitMQSinkPublishOptions<IN> extends Serializable {
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkConnection;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+
+import com.rabbitmq.client.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQSinkWriterBase is the common abstract class of {@link RabbitMQSinkWriterAtMostOnce},
+ * {@link RabbitMQSinkWriterAtLeastOnce} and {@link RabbitMQSinkWriterExactlyOnce}.
+ *
+ * @param <T> Type of the elements in this sink
+ */
+public abstract class RabbitMQSinkWriterBase<T>
+        implements SinkWriter<T, Void, RabbitMQSinkWriterState<T>> {

Review Comment:
   This interface is deprecated.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least once.
+ *
+ * <p>At-least-once consistency is implemented by assigning sequence numbers to arriving messages
+ * and buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are resend only when a
+ * checkpoint is triggered (to avoid complex time tracking mechanisms for each individual message).
+ * Thus on each checkpoint, all messages which were sent at least once before to RabbitMQ but are
+ * still unacknowledged will be send once again - duplications are possible by this behavior.

Review Comment:
   ```suggestion
    * still unacknowledged will be sent once again - duplications are possible by this behavior.
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and a {@link
+ * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void, RabbitMQSinkWriterState<T>, Void> {
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final SerializationSchema<T> serializationSchema;
+    private final RabbitMQSinkPublishOptions<T> publishOptions;
+    private final ConsistencyMode consistencyMode;
+    private final SerializableReturnListener returnListener;
+
+    private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = ConsistencyMode.AT_MOST_ONCE;

Review Comment:
   Where is it documented that this is the default?
   
   Consider forcing the user to explicitly set this to avoid surprises. (particular when the default is dropping data)



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/specialized/RabbitMQSinkWriterAtLeastOnce.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer.specialized;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+
+import com.rabbitmq.client.ConfirmCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * A {@link SinkWriter} implementation for {@link RabbitMQSink} that has at-least-once semantics,
+ * meaning it guarantees that outgoing message arrive at RabbitMQ at least once.
+ *
+ * <p>At-least-once consistency is implemented by assigning sequence numbers to arriving messages
+ * and buffering them together in the state of the writer until an ack arrives.
+ *
+ * <p>Checkpointing is required for at-least-once to work because messages are resend only when a
+ * checkpoint is triggered (to avoid complex time tracking mechanisms for each individual message).
+ * Thus on each checkpoint, all messages which were sent at least once before to RabbitMQ but are
+ * still unacknowledged will be send once again - duplications are possible by this behavior.
+ *
+ * <p>After a failure, a new writer gets initialized with one or more states that contain
+ * unacknowledged messages. These messages get resend immediately while buffering them in the new

Review Comment:
   ```suggestion
    * unacknowledged messages. These messages get re-sent immediately while buffering them in the new
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterStateSerializer;
+import org.apache.flink.connector.rabbitmq.sink.writer.RabbitMQSinkWriterBase;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once,
+ * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once,
+ * checkpointing needs to be enabled.
+ *
+ * <pre>{@code
+ * RabbitMQSink
+ *     .builder()
+ *     .setConnectionConfig(connectionConfig)
+ *     .setQueueName("queue")
+ *     .setSerializationSchema(new SimpleStringSchema())
+ *     .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the sink a {@code connectionConfig} must be specified via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to publish to and a {@link
+ * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added
+ * optionally to route messages in RabbitMQ.
+ *
+ * <p>If at-least-once is required messages are buffered until an acknowledgement arrives because
+ * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent
+ * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend.
+ *
+ * <p>In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all
+ * messages within a checkpoint are delivered once and only once. All messages that arrive in a
+ * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint
+ * is triggered. If the transaction fails, all messages that were a part of the transaction are put
+ * back into the buffer and a resend is issued in the next checkpoint.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and the performance will drop.
+ * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified
+ * checkpointing interval.
+ *
+ * <p>If publish options are used and the checkpointing mode is at-least-once or exactly-once, they
+ * require a {@link DeserializationSchema} to be provided because messages that were persisted as
+ * part of an earlier checkpoint are needed to recompute routing/exchange.
+ */
+public class RabbitMQSink<T> implements Sink<T, Void, RabbitMQSinkWriterState<T>, Void> {

Review Comment:
   usage of deprecated interface



##########
flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQBaseTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.rabbitmq.sink.RabbitMQSink;
+import org.apache.flink.connector.rabbitmq.source.RabbitMQSource;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The base class for RabbitMQ tests. It sets up a flink cluster and a docker image for RabbitMQ. It
+ * provides behavior to easily add onto the stream, send message to RabbitMQ and get the messages in
+ * RabbitMQ.
+ */
+public abstract class RabbitMQBaseTest {

Review Comment:
   Please don't use test bases; they've always ended up creating problems.
   
   Model this as an extension instead.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/writer/RabbitMQSinkWriterBase.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink.writer;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkConnection;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkMessageWrapper;
+import org.apache.flink.connector.rabbitmq.sink.common.RabbitMQSinkPublishOptions;
+import org.apache.flink.connector.rabbitmq.sink.common.SerializableReturnListener;
+import org.apache.flink.connector.rabbitmq.sink.state.RabbitMQSinkWriterState;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce;
+import org.apache.flink.connector.rabbitmq.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce;
+
+import com.rabbitmq.client.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQSinkWriterBase is the common abstract class of {@link RabbitMQSinkWriterAtMostOnce},
+ * {@link RabbitMQSinkWriterAtLeastOnce} and {@link RabbitMQSinkWriterExactlyOnce}.

Review Comment:
   This doesn't really tell me anything about what this class does, and such a listing easily becomes out-dated.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.common;
+
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper class for the message received from RabbitMQ that holds the deserialized message, the
+ * delivery tag and the correlation id.
+ *
+ * @param <T> The type of the message to hold.
+ */
+public class RabbitMQSourceMessageWrapper<T> {
+    private final long deliveryTag;
+    private final String correlationId;
+    private final T message;
+
+    public RabbitMQSourceMessageWrapper(
+            long deliveryTag, @Nullable String correlationId, @Nullable T message) {
+        this.deliveryTag = deliveryTag;
+        this.correlationId = correlationId;
+        this.message = message;
+    }
+
+    public RabbitMQSourceMessageWrapper(long deliveryTag, String correlationId) {
+        this(deliveryTag, correlationId, null);
+    }

Review Comment:
   missing nonNullChecks



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, RabbitMQSourceSplit> {
+    protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+    // The assigned split from the enumerator.
+    private RabbitMQSourceSplit split;
+
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    private final SourceReaderContext sourceReaderContext;
+    // The deserialization schema for the messages of RabbitMQ.
+    private final DeserializationSchema<T> deliveryDeserializer;
+    // The collector keeps the messages received from RabbitMQ.
+    private final RabbitMQCollector<T> collector;
+
+    public RabbitMQSourceReaderBase(
+            SourceReaderContext sourceReaderContext,
+            DeserializationSchema<T> deliveryDeserializer) {
+        this.sourceReaderContext = requireNonNull(sourceReaderContext);
+        this.deliveryDeserializer = requireNonNull(deliveryDeserializer);
+        this.collector = new RabbitMQCollector<>();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting source reader and send split request");
+        sourceReaderContext.sendSplitRequest();
+    }
+
+    // ------------- start RabbitMQ methods  --------------
+
+    private void setupRabbitMQ() throws Exception {
+        setupConnection();
+        setupChannel();
+        LOG.info(
+                "RabbitMQ Connection was successful: Waiting for messages from the queue. To exit press CTRL+C");
+    }
+
+    private ConnectionFactory setupConnectionFactory() throws Exception {
+        return split.getConnectionConfig().getConnectionFactory();
+    }
+
+    private void setupConnection() throws Exception {
+        rmqConnection = setupConnectionFactory().newConnection();
+    }
+
+    /** @return boolean whether messages should be automatically acknowledged to RabbitMQ. */
+    protected abstract boolean isAutoAck();
+
+    /**
+     * This function will be called when a new message from RabbitMQ gets pushed to the source. The
+     * message will be deserialized and forwarded to our message collector where it is buffered
+     * until it can be processed.
+     *
+     * @param consumerTag The consumer tag of the message.
+     * @param delivery The delivery from RabbitMQ.
+     * @throws IOException if something fails during deserialization.
+     */
+    protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery)
+            throws IOException {
+
+        AMQP.BasicProperties properties = delivery.getProperties();
+        byte[] body = delivery.getBody();
+        Envelope envelope = delivery.getEnvelope();
+        collector.setMessageIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag());
+        deliveryDeserializer.deserialize(body, collector);
+    }
+
+    protected void setupChannel() throws IOException {
+        rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(split.getQueueName(), true, false, false, null);
+
+        // Set maximum of unacknowledged messages
+        if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+            // global: false - the prefetch count is set per consumer, not per RabbitMQ channel
+            rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(), false);
+        }
+
+        final DeliverCallback deliverCallback = this::handleMessageReceivedCallback;
+        rmqChannel.basicConsume(
+                split.getQueueName(), isAutoAck(), deliverCallback, consumerTag -> {});
+    }
+
+    // ------------- end RabbitMQ methods  --------------
+
+    /**
+     * This method provides a hook that is called when a message gets polled by the output.
+     *
+     * @param message the message that was polled by the output.
+     */
+    protected void handleMessagePolled(RabbitMQSourceMessageWrapper<T> message) {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) {
+        RabbitMQSourceMessageWrapper<T> message = collector.pollMessage();
+        if (message == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        output.collect(message.getMessage());
+        handleMessagePolled(message);
+
+        return collector.hasUnpolledMessages()
+                ? InputStatus.MORE_AVAILABLE
+                : InputStatus.NOTHING_AVAILABLE;
+    }
+
+    @Override
+    public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+        return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>();
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        return CompletableFuture.runAsync(
+                () -> {
+                    while (!collector.hasUnpolledMessages()) {
+                        // supposed to be empty
+                    }
+                });
+    }
+
+    /**
+     * Assign the split from the enumerator. If the source reader already has a split nothing
+     * happens. After the split is assigned, the connection to RabbitMQ can be setup.
+     *
+     * @param list RabbitMQSourceSplits with only one element.
+     * @see RabbitMQSourceEnumerator
+     * @see RabbitMQSourceSplit
+     */
+    @Override
+    public void addSplits(List<RabbitMQSourceSplit> list) {
+        if (split != null) {
+            return;
+        }

Review Comment:
   This seems wrong.



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.enumerator;
+
+/**
+ * The EnumState is empty because every reader gets assigned the same split. And therefore, no split
+ * assignment needs to be remembered.
+ *
+ * @see RabbitMQSourceEnumerator
+ */
+public class RabbitMQSourceEnumState {}

Review Comment:
   Does this imply that messages are routed to different source subtasks on restart?



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source.reader;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.Envelope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The source reader for RabbitMQ queues. This is the base class of the different consistency modes.
+ *
+ * @param <T> The output type of the source.
+ */
+public abstract class RabbitMQSourceReaderBase<T> implements SourceReader<T, RabbitMQSourceSplit> {
+    protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceReaderBase.class);
+
+    // The assigned split from the enumerator.
+    private RabbitMQSourceSplit split;
+
+    private Connection rmqConnection;
+    private Channel rmqChannel;
+
+    private final SourceReaderContext sourceReaderContext;
+    // The deserialization schema for the messages of RabbitMQ.
+    private final DeserializationSchema<T> deliveryDeserializer;
+    // The collector keeps the messages received from RabbitMQ.
+    private final RabbitMQCollector<T> collector;
+
+    public RabbitMQSourceReaderBase(
+            SourceReaderContext sourceReaderContext,
+            DeserializationSchema<T> deliveryDeserializer) {
+        this.sourceReaderContext = requireNonNull(sourceReaderContext);
+        this.deliveryDeserializer = requireNonNull(deliveryDeserializer);
+        this.collector = new RabbitMQCollector<>();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting source reader and send split request");
+        sourceReaderContext.sendSplitRequest();
+    }
+
+    // ------------- start RabbitMQ methods  --------------
+
+    private void setupRabbitMQ() throws Exception {
+        setupConnection();
+        setupChannel();
+        LOG.info(
+                "RabbitMQ Connection was successful: Waiting for messages from the queue. To exit press CTRL+C");
+    }
+
+    private ConnectionFactory setupConnectionFactory() throws Exception {
+        return split.getConnectionConfig().getConnectionFactory();
+    }
+
+    private void setupConnection() throws Exception {
+        rmqConnection = setupConnectionFactory().newConnection();
+    }
+
+    /** @return boolean whether messages should be automatically acknowledged to RabbitMQ. */
+    protected abstract boolean isAutoAck();
+
+    /**
+     * This function will be called when a new message from RabbitMQ gets pushed to the source. The
+     * message will be deserialized and forwarded to our message collector where it is buffered
+     * until it can be processed.
+     *
+     * @param consumerTag The consumer tag of the message.
+     * @param delivery The delivery from RabbitMQ.
+     * @throws IOException if something fails during deserialization.
+     */
+    protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery)
+            throws IOException {
+
+        AMQP.BasicProperties properties = delivery.getProperties();
+        byte[] body = delivery.getBody();
+        Envelope envelope = delivery.getEnvelope();
+        collector.setMessageIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag());
+        deliveryDeserializer.deserialize(body, collector);
+    }
+
+    protected void setupChannel() throws IOException {
+        rmqChannel = rmqConnection.createChannel();
+        rmqChannel.queueDeclare(split.getQueueName(), true, false, false, null);
+
+        // Set maximum of unacknowledged messages
+        if (getSplit().getConnectionConfig().getPrefetchCount().isPresent()) {
+            // global: false - the prefetch count is set per consumer, not per RabbitMQ channel
+            rmqChannel.basicQos(getSplit().getConnectionConfig().getPrefetchCount().get(), false);
+        }
+
+        final DeliverCallback deliverCallback = this::handleMessageReceivedCallback;
+        rmqChannel.basicConsume(
+                split.getQueueName(), isAutoAck(), deliverCallback, consumerTag -> {});
+    }
+
+    // ------------- end RabbitMQ methods  --------------
+
+    /**
+     * This method provides a hook that is called when a message gets polled by the output.
+     *
+     * @param message the message that was polled by the output.
+     */
+    protected void handleMessagePolled(RabbitMQSourceMessageWrapper<T> message) {}
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<T> output) {
+        RabbitMQSourceMessageWrapper<T> message = collector.pollMessage();
+        if (message == null) {
+            return InputStatus.NOTHING_AVAILABLE;
+        }
+
+        output.collect(message.getMessage());
+        handleMessagePolled(message);
+
+        return collector.hasUnpolledMessages()
+                ? InputStatus.MORE_AVAILABLE
+                : InputStatus.NOTHING_AVAILABLE;
+    }
+
+    @Override
+    public List<RabbitMQSourceSplit> snapshotState(long checkpointId) {
+        return split != null ? Collections.singletonList(split.copy()) : new ArrayList<>();

Review Comment:
   ```suggestion
           return split != null ? Collections.singletonList(split.copy()) : Collections.emptyList();
   ```



##########
flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.rabbitmq.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumState;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumStateSerializer;
+import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator;
+import org.apache.flink.connector.rabbitmq.source.reader.RabbitMQSourceReaderBase;
+import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce;
+import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderAtMostOnce;
+import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderExactlyOnce;
+import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit;
+import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It provides
+ * at-most-once, at-least-once and exactly-once processing semantics. For at-least-once and
+ * exactly-once, checkpointing needs to be enabled. The source operates as a StreamingSource and
+ * thus works in a streaming fashion. Please use a {@link RabbitMQSourceBuilder} to construct the
+ * source. The following example shows how to create a RabbitMQSource emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * RabbitMQSource<String> source = RabbitMQSource
+ *     .<String>builder()
+ *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ *     .setQueueName("myQueue")
+ *     .setDeliveryDeserializer(new SimpleStringSchema())
+ *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+ *     .build();
+ * }</pre>
+ *
+ * <p>When creating the source a {@code connectionConfig} must be specified via {@link
+ * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to
+ * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a
+ * password and a port. Besides that, the {@code queueName} to consume from and a {@link
+ * DeserializationSchema}
+ *
+ * <p>When using at-most-once consistency, messages are automatically acknowledged when received
+ * from RabbitMQ and later consumed by the output. In case of a failure, messages might be lost.
+ * More details in {@link RabbitMQSourceReaderAtMostOnce}.
+ *
+ * <p>In case of at-least-once consistency, message are buffered and later consumed by the output.
+ * Once a checkpoint is finished, the messages that were consumed by the output are acknowledged to
+ * RabbitMQ. This way, we ensure that the messages are successfully received by the output. In case
+ * of a system failure, the message that were acknowledged to RabbitMQ will be resend by RabbitMQ.
+ * More details in {@link RabbitMQSourceReaderAtLeastOnce}.
+ *
+ * <p>To ensure exactly-once consistency, messages are deduplicated through {@code correlationIds}.
+ * Similar to at-least-once consistency, we store the {@code deliveryTags} of the messages that are
+ * consumed by the output to acknowledge them later. A transactional RabbitMQ channel is used to
+ * ensure that all messages are successfully acknowledged to RabbitMQ. More details in {@link
+ * RabbitMQSourceReaderExactlyOnce}.
+ *
+ * <p>Keep in mind that the transactional channels are heavyweight and performance will drop. Under
+ * heavy load, checkpoints can be delayed if a transaction takes longer than the specified
+ * checkpointing interval.
+ *
+ * @param <T> the output type of the source.
+ */
+public class RabbitMQSource<T>
+        implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>, ResultTypeQueryable<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSource.class);
+
+    private final RabbitMQConnectionConfig connectionConfig;
+    private final String queueName;
+    private final DeserializationSchema<T> deserializationSchema;
+    private final ConsistencyMode consistencyMode;
+
+    private RabbitMQSource(
+            RabbitMQConnectionConfig connectionConfig,
+            String queueName,
+            DeserializationSchema<T> deserializationSchema,
+            ConsistencyMode consistencyMode) {
+        this.connectionConfig = requireNonNull(connectionConfig);
+        this.queueName = requireNonNull(queueName);
+        this.deserializationSchema = requireNonNull(deserializationSchema);
+        this.consistencyMode = requireNonNull(consistencyMode);
+
+        LOG.info("Create RabbitMQ source");
+    }
+
+    /**
+     * Get a {@link RabbitMQSourceBuilder} for the source.
+     *
+     * @param <T> type of the source.
+     * @return a source builder
+     * @see RabbitMQSourceBuilder
+     */
+    public static <T> RabbitMQSourceBuilder<T> builder() {
+        return new RabbitMQSourceBuilder<>();
+    }
+
+    /**
+     * The boundedness is always continuous unbounded as this is a streaming-only source.
+     *
+     * @return Boundedness continuous unbounded.
+     * @see Boundedness
+     */
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    /**
+     * Returns a new initialized source reader of the source's consistency mode.
+     *
+     * @param sourceReaderContext context which the reader will be created in.
+     * @return RabbitMQSourceReader a source reader of the specified consistency type.
+     * @see RabbitMQSourceReaderBase
+     */
+    @Override
+    public SourceReader<T, RabbitMQSourceSplit> createReader(
+            SourceReaderContext sourceReaderContext) {
+        LOG.info("New Source Reader of type {} requested.", consistencyMode);
+        switch (consistencyMode) {
+            case AT_MOST_ONCE:
+                return new RabbitMQSourceReaderAtMostOnce<>(
+                        sourceReaderContext, deserializationSchema);
+            case AT_LEAST_ONCE:
+                return new RabbitMQSourceReaderAtLeastOnce<>(
+                        sourceReaderContext, deserializationSchema);
+            case EXACTLY_ONCE:
+                return new RabbitMQSourceReaderExactlyOnce<>(
+                        sourceReaderContext, deserializationSchema);
+            default:
+                throw new IllegalStateException(
+                        "Error in creating a SourceReader: No valid consistency mode ("
+                                + consistencyMode
+                                + ") was specified.");
+        }
+    }
+
+    /**
+     * @param splitEnumeratorContext context which the enumerator will be created in
+     * @return a new split enumerator
+     * @see SplitEnumerator
+     */
+    @Override
+    public SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> createEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> splitEnumeratorContext) {
+        return new RabbitMQSourceEnumerator(
+                splitEnumeratorContext, consistencyMode, connectionConfig, queueName);
+    }
+
+    /**
+     * @param splitEnumeratorContext context which the enumerator will be created in
+     * @param enumState enum state the
+     * @return a new split enumerator
+     * @see SplitEnumerator
+     */
+    @Override
+    public SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> restoreEnumerator(
+            SplitEnumeratorContext<RabbitMQSourceSplit> splitEnumeratorContext,
+            RabbitMQSourceEnumState enumState) {
+        return new RabbitMQSourceEnumerator(
+                splitEnumeratorContext, consistencyMode, connectionConfig, queueName, enumState);
+    }
+
+    /**
+     * @return a simple serializer for a RabbitMQPartitionSplit
+     * @see SimpleVersionedSerializer
+     */
+    @Override
+    public SimpleVersionedSerializer<RabbitMQSourceSplit> getSplitSerializer() {
+        return new RabbitMQSourceSplitSerializer();
+    }
+
+    /**
+     * @return a simple serializer for a RabbitMQSourceEnumState
+     * @see SimpleVersionedSerializer
+     */
+    @Override
+    public SimpleVersionedSerializer<RabbitMQSourceEnumState> getEnumeratorCheckpointSerializer() {
+        return new RabbitMQSourceEnumStateSerializer();
+    }
+
+    /**
+     * @return type information
+     * @see TypeInformation
+     */
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    /**
+     * A @builder class to simplify the creation of a {@link RabbitMQSource}.
+     *
+     * <p>The following example shows the minimum setup to create a RabbitMQSource that reads String
+     * messages from a Queue.
+     *
+     * <pre>{@code
+     * RabbitMQSource<String> source = RabbitMQSource
+     *     .<String>builder()
+     *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+     *     .setQueueName("myQueue")
+     *     .setDeliveryDeserializer(new SimpleStringSchema())
+     *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+     *     .build();
+     * }</pre>
+     *
+     * <p>For details about the connection config refer to {@link RabbitMQConnectionConfig}. For
+     * details about the available consistency modes refer to {@link ConsistencyMode}.
+     *
+     * @param <T> the output type of the source.
+     */
+    public static class RabbitMQSourceBuilder<T> {
+        // The configuration for the RabbitMQ connection.
+        private RabbitMQConnectionConfig connectionConfig;
+        // Name of the queue to consume from.
+        private String queueName;
+        // The deserializer for the messages of RabbitMQ.
+        private DeserializationSchema<T> deserializationSchema;
+        // The consistency mode for the source.
+        private ConsistencyMode consistencyMode;
+
+        /**
+         * Build the {@link RabbitMQSource}.
+         *
+         * @return a RabbitMQSource with the configuration set for this builder.
+         */
+        public RabbitMQSource<T> build() {
+            return new RabbitMQSource<>(
+                    connectionConfig, queueName, deserializationSchema, consistencyMode);
+        }
+
+        /**
+         * Set the connection config for RabbitMQ.
+         *
+         * @param connectionConfig the connection configuration for RabbitMQ.
+         * @return this RabbitMQSourceBuilder
+         * @see RabbitMQConnectionConfig
+         */
+        public RabbitMQSourceBuilder<T> setConnectionConfig(
+                RabbitMQConnectionConfig connectionConfig) {
+            this.connectionConfig = connectionConfig;
+            return this;
+        }
+
+        /**
+         * Set the name of the queue to consume from.
+         *
+         * @param queueName the name of the queue to consume from.
+         * @return this RabbitMQSourceBuilder
+         */
+        public RabbitMQSourceBuilder<T> setQueueName(String queueName) {
+            this.queueName = queueName;
+            return this;
+        }
+
+        /**
+         * Set the deserializer for the message deliveries from RabbitMQ.
+         *
+         * @param deserializationSchema a deserializer for the message deliveries from RabbitMQ.
+         * @return this RabbitMQSourceBuilder
+         * @see DeserializationSchema
+         */
+        public RabbitMQSourceBuilder<T> setDeserializationSchema(
+                DeserializationSchema<T> deserializationSchema) {
+            this.deserializationSchema = deserializationSchema;

Review Comment:
   missing null checks; preferable to do them here to fail as early as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] RocMarshal commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1656547551

   > > I am working for it now.
   > 
   > Any update to report from your end on this @RocMarshal ?
   
   Hi, @MartijnVisser Still in doing. I'll update in the end of this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] RocMarshal commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1613379448

   > > Would you like to continue advancing it ?
   > 
   > @RocMarshal Do you want to take this over?
   
   Hi, @MartijnVisser Glad to get your attention.
   In fact, you have already assigned this ticket to me on https://issues.apache.org/jira/browse/FLINK-20628 .  I am working for it now.
   Thank you  ~ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1203925901

   @pscls The CI fails due to spotless; can you fix that? (By running `mvn spotless:apply`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] pscls commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
pscls commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1209062942

   > @pscls The CI fails due to spotless; can you fix that? (By running `mvn spotless:apply`)
   
   @MartijnVisser I've nothing to commit when running `mvn spotless:apply`.
   <img width="580" alt="image" src="https://user-images.githubusercontent.com/9250254/183599768-253a8b36-3851-4c91-98a2-f04080f2446f.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] zentol commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
zentol commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1263341587

   You can avoid a lot of boilerplate by using the preliminary flink-connector-parent pom as shown here: https://github.com/apache/flink-connector-elasticsearch/commit/6e30d5d63d395b2f731418c34f5838231dcab6b8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] RocMarshal commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1587019961

   Hi, @pscls Thank you very much for the contribution.
   I notice that this PR has not been updated for a long time.
   Would you like to continue advancing it ?
   After the PR completed,  FLINK-25380 will be introduced.
   Looking forward to your opinion.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1655509560

   > I am working for it now.
   
   Any update to report from your end on this @RocMarshal ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1232836643

   @pscls Weird. Could you push once more, since the logs are no longer available?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] wanglijie95 commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1160089843

   @MartijnVisser @pscls I noticed that the GitBox of flink-connector-rabbitmq sent emails to the dev@flink.apache.org. Is it expected?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1160102027

   @wanglijie95 No. Most likely this is caused because the PR was created/is not yet using the ASF config as defined in https://github.com/apache/flink-connector-rabbitmq/blob/main/.asf.yaml


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on a diff in pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on code in PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#discussion_r983398565


##########
flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSinkITCase.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rabbitmq.sink;
+
+import org.apache.flink.connector.rabbitmq.common.ConsistencyMode;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQBaseTest;
+import org.apache.flink.connector.rabbitmq.common.RabbitMQContainerClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * The tests for the RabbitMQ sink with different consistency modes. As the tests are working a lot
+ * with timeouts to uphold stream it is possible that tests might fail.
+ */
+public class RabbitMQSinkITCase extends RabbitMQBaseTest {
+
+    private static AtomicBoolean shouldFail;
+
+    @Before
+    public void setup() {
+        shouldFail = new AtomicBoolean(true);
+    }
+
+    private static class GeneratorFailureSource implements SourceFunction<String> {
+
+        private final BlockingQueue<String> messagesToSend;
+        private int failAtNthMessage;
+
+        public GeneratorFailureSource(BlockingQueue<String> messagesToSend, int failAtNthMessage) {
+            this.messagesToSend = messagesToSend;
+            this.failAtNthMessage = failAtNthMessage;
+            shouldFail.set(true);
+        }
+
+        @Override
+        public void run(SourceContext<String> sourceContext) throws Exception {
+            while (true) {
+                if (failAtNthMessage == 0 && shouldFail.get()) {
+                    shouldFail.set(false);
+                    throw new Exception("Supposed to Fail");
+                }
+                failAtNthMessage -= 1;
+                String message = messagesToSend.take();
+                sourceContext.collect(message);
+            }
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    @Test
+    public void atMostOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient<String> client =
+                addSinkOn(stream, ConsistencyMode.AT_MOST_ONCE, messages.size());
+        executeFlinkJob();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceTest() throws Exception {
+        List<String> messages = getRandomMessages(100);
+        DataStream<String> stream = env.fromCollection(messages);
+        RabbitMQContainerClient<String> client =
+                addSinkOn(stream, ConsistencyMode.AT_LEAST_ONCE, messages.size());
+
+        executeFlinkJob();
+        client.await();
+
+        List<String> receivedMessages = client.getConsumedMessages();
+        assertEquals(messages, receivedMessages);
+    }
+
+    @Test
+    public void atLeastOnceWithFlinkFailureTest() throws Exception {

Review Comment:
   Also fails on the CI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-rabbitmq] MartijnVisser commented on pull request #1: [FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #1:
URL: https://github.com/apache/flink-connector-rabbitmq/pull/1#issuecomment-1613280426

   > Would you like to continue advancing it ?
   
   @RocMarshal Do you want to take this over?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org