You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/01 13:11:51 UTC

[GitHub] [flink] dawidwys commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

dawidwys commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r498214070



##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */

Review comment:
       ```suggestion
   	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
   	 * collector.
   	 *
   	 * <p><b>NOTICE:</b> The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
   	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
   	 * the {@link RMQSource}.
   	 */
   ```

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */
+	public  void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 *
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+
+	/**
+	 * The {@link TypeInformation} for the deserialized T.
+	 * As an example the proper implementation of this method if T is a String is:
+	 * {@code return TypeExtractor.getForClass(String.class)}
+	 * @return TypeInformation
+	 */
+	public TypeInformation<T> getProducedType();
+
+	/**
+	 * Special collector for RMQ messages.
+	 * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been

Review comment:
       If you want to start a new paragraph it should be preceded by an empty line and a `<p>` tag.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;

Review comment:
       Most of the schemas do not need open method. Therefore it would be nice to add an empty default method.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */
+	public  void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 *
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+
+	/**
+	 * The {@link TypeInformation} for the deserialized T.
+	 * As an example the proper implementation of this method if T is a String is:
+	 * {@code return TypeExtractor.getForClass(String.class)}
+	 * @return TypeInformation
+	 */
+	public TypeInformation<T> getProducedType();

Review comment:
       It is already defined in the `ResultTypeQueryable` interface. Let's not duplicate it.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */
+	public  void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 *
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+
+	/**
+	 * The {@link TypeInformation} for the deserialized T.
+	 * As an example the proper implementation of this method if T is a String is:
+	 * {@code return TypeExtractor.getForClass(String.class)}
+	 * @return TypeInformation
+	 */
+	public TypeInformation<T> getProducedType();
+
+	/**
+	 * Special collector for RMQ messages.
+	 * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been
+	 * processed or not.
+	 * @param <T>

Review comment:
       Please do not list parameters if you do not add documentation for it.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */
+	public  void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 *
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+
+	/**
+	 * The {@link TypeInformation} for the deserialized T.
+	 * As an example the proper implementation of this method if T is a String is:
+	 * {@code return TypeExtractor.getForClass(String.class)}
+	 * @return TypeInformation
+	 */
+	public TypeInformation<T> getProducedType();
+
+	/**
+	 * Special collector for RMQ messages.
+	 * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been

Review comment:
       The filtering logic is an internal detail. What is important about the collector is that:
   1. It lets you collect 0 to n elements
   2. Have custom logic for setting message identifiers.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

Review comment:
       Please remove the unnecessary `public` modifiers from the methods. All methods of an interface must be `public`.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -123,7 +124,74 @@ public RMQSource(RMQConnectionConfig rmqConnectionConfig,
 		this.rmqConnectionConfig = rmqConnectionConfig;
 		this.queueName = queueName;
 		this.usesCorrelationId = usesCorrelationId;
-		this.schema = deserializationSchema;
+		this.deliveryDeserializer = wrapDeserializationSchema(deserializationSchema);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
+	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
+	 *
+	 * <p>For exactly-once, please use the constructor
+	 * {@link RMQSource#RMQSource(RMQConnectionConfig, String, boolean, RMQDeserializationSchema)}.
+	 *
+	 * <p>It also uses the provided {@link RMQDeserializationSchema} to parse both the correlationID and the message.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName  The queue to receive messages from.
+	 * @param deliveryDeserializer A {@link RMQDeserializationSchema} for parsing the RMQDelivery.
+	 */
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig, String queueName,
+					RMQDeserializationSchema<OUT> deliveryDeserializer) {
+		this(rmqConnectionConfig, queueName, false, deliveryDeserializer);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages
+	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source is
+	 * undefined. If in doubt, set usesCorrelationId to false. When correlation ids are not
+	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
+	 *
+	 * <p>It also uses the provided {@link RMQDeserializationSchema} to parse both the correlationID and the message.
+	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param queueName The queue to receive messages from.
+	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
+	 *                          id to deduplicate messages (in case of failed acknowledgments).
+	 *                          Only used when checkpointing is enabled.
+	 * @param deliveryDeserializer A {@link RMQDeserializationSchema} for parsing the RMQDelivery.
+	 */
+	public RMQSource(RMQConnectionConfig rmqConnectionConfig,
+					String queueName, boolean usesCorrelationId, RMQDeserializationSchema<OUT> deliveryDeserializer) {
+		super(String.class);
+		this.rmqConnectionConfig = rmqConnectionConfig;
+		this.queueName = queueName;
+		this.usesCorrelationId = usesCorrelationId;
+		this.deliveryDeserializer = deliveryDeserializer;
+	}
+
+	static <T> RMQDeserializationSchema<T> wrapDeserializationSchema(DeserializationSchema<T> schema) {

Review comment:
       Please move that to a top level class e.g. `RMQDeserializationSchemaWrapper`.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

Review comment:
       Do that as well for the `RMQCollector`.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -318,6 +320,36 @@ public void testConstructorParams() throws Exception {
 		assertEquals("passTest", testObj.getFactory().getPassword());
 	}
 
+	/**
+	 * Tests getting the correct body and correlationID given which constructor was called.
+	 * if the constructor with the {@link DeserializationSchema} was called it should extract the body of the message
+	 * from the {@link Delivery} and the correlation ID from the {@link AMQP.BasicProperties} which are
+	 * mocked to "I Love Turtles" and "0".
+	 * if the constructor with the {@link RMQDeserializationSchema} was called it uses the
+	 * {@link RMQDeserializationSchema#deserialize} method to parse the message and extract the correlation ID which
+	 * both are implemented in {@link RMQTestSource#initAMQPMocks()} to return the
+	 * {@link AMQP.BasicProperties#getMessageId()} that is mocked to return "1-MESSAGE_ID"
+	 */
+	@Test
+	public void testProcessMessage() throws Exception {

Review comment:
       Please fix all warnings in this test. There are a few warnings about using raw types.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */

Review comment:
       Please, add javadoc for the parameters.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
 
 			assertEquals(numIds, messageIds.size());
 			if (messageIds.size() > 0) {
-				assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+				assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));

Review comment:
       Why did you have to change that?

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -237,29 +305,36 @@ public void close() throws Exception {
 		}
 	}
 
+	/**
+	 * Parse and collects the body of the an AMQP message.
+	 *
+	 * <p>If any of the constructors with the {@link DeserializationSchema} class was used to construct the source
+	 * it uses the {@link DeserializationSchema#deserialize(byte[])} to parse the body of the AMQP message.
+	 *
+	 * <p>If any of the constructors with the {@link RMQDeserializationSchema } class was used to construct the source it uses the
+	 * {@link RMQDeserializationSchema#deserialize(Envelope, AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+	 * method of that provided instance.
+	 *
+	 * @param delivery the AMQP {@link Delivery}
+	 * @param collector a {@link RMQCollectorImpl} to collect the data
+	 * @throws IOException

Review comment:
       remove the `throws` if you do not add an explanation

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -237,29 +305,36 @@ public void close() throws Exception {
 		}
 	}
 
+	/**
+	 * Parse and collects the body of the an AMQP message.
+	 *
+	 * <p>If any of the constructors with the {@link DeserializationSchema} class was used to construct the source
+	 * it uses the {@link DeserializationSchema#deserialize(byte[])} to parse the body of the AMQP message.
+	 *
+	 * <p>If any of the constructors with the {@link RMQDeserializationSchema } class was used to construct the source it uses the
+	 * {@link RMQDeserializationSchema#deserialize(Envelope, AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+	 * method of that provided instance.
+	 *
+	 * @param delivery the AMQP {@link Delivery}
+	 * @param collector a {@link RMQCollectorImpl} to collect the data
+	 * @throws IOException
+	 */
+	protected void processMessage(Delivery delivery, RMQDeserializationSchema.RMQCollector collector) throws IOException {

Review comment:
       Please, do not use raw types.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -237,29 +305,36 @@ public void close() throws Exception {
 		}
 	}
 
+	/**
+	 * Parse and collects the body of the an AMQP message.
+	 *
+	 * <p>If any of the constructors with the {@link DeserializationSchema} class was used to construct the source

Review comment:
       I don't think this comment is very helpful in this method. This method always work with `RMQDeserializationSchema`. This paragraph is the characteristic of the wrapper `RMQDeserializationSchemaWrapper`.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+

Review comment:
       Please keep a 120 characters line limit.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */
+	public  void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException;

Review comment:
       Please do not use generic `RMQCollector`. It should be `RMQCollector<T>`.

##########
File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Initialization method for the schema. It is called before the actual working methods
+	 * {@link #deserialize} and thus suitable for one time setup work.
+	 *
+	 * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g.
+	 * registering user metrics.
+	 *
+	 * @param context Contextual information that can be used during initialization.
+	 */
+	public void open(DeserializationSchema.InitializationContext context) throws Exception;
+
+
+	/**
+	 * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the
+	 * collector.
+	 * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with
+	 * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled
+	 * the {@link RMQSource}.
+	 * @param envelope
+	 * @param properties
+	 * @param body
+	 * @throws IOException
+	 */

Review comment:
       Is it really the case it **MUST** call the `setMessageIdentifiers`? I thought we said we will fallback to the original logic if they are not set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org