You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/03 14:59:50 UTC

[GitHub] [flink] becketqin opened a new pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

becketqin opened a new pull request #14303:
URL: https://github.com/apache/flink/pull/14303


   ## What is the purpose of the change
   The current KafkaRecordDesrializer has the following problems:
   1. The existing `DesrializationSchema` and legacy `KafkaDeserializationSchema` implementations cannot be reused.
   2. Missing an `open()` method with context for serialization and deserialization.
   
   This patch fixes the above issues by doing the following:
   * Add a `getUserCodeClassLoader()` method to `SourceReaderContext` so the `SourceReader` implementation can construct the `SerializationDeserializationContext`.
   * Add the bridge method to facilitate reuse of the DeserializationSchema and legacy `KafkaDeserializationSchema`.
   * Rename `KafkaRecordDeserializer` to `KafkaRecordDeserializationSchema` to follow the naming convention.
   
   ## Brief change log
   * Add a `getUserCodeClassLoader()` method to `SourceReaderContext` so the `SourceReader` implementation can construct the `SerializationDeserializationContext`.
   * Add the bridge method to facilitate reuse of the DeserializationSchema and legacy `KafkaDeserializationSchema`.
   
   ## Verifying this change
   Unit tests have been added to `KafkaRecordDeserializationSchemaTest` to verify the change in `KafkaRecordDeserializationSchema`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (**no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes**)
     - The serializers: (**yes**)
     - The runtime per-record code paths (performance sensitive): (**no**)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**no**)
     - The S3 file system connector: (**no**)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**no**)
     - If yes, how is the feature documented? (Java doc)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e538857730e35f02d17631d79df453453874c6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0972e9422c2f8c767d1e855c3f234d835e41430b Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494) 
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin closed pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
becketqin closed pull request #14303:
URL: https://github.com/apache/flink/pull/14303


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500) 
   * 07e538857730e35f02d17631d79df453453874c6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #14303:
URL: https://github.com/apache/flink/pull/14303#discussion_r535517311



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaDeserializationSchemaWrapper.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.reader.deserializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.IOException;
+
+/**
+ * A wrapper class that wraps a {@link org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema}
+ * to deserialize {@link ConsumerRecord ConsumerRecords}.
+ *
+ * @param <T> the type of the deserialized records.
+ */
+class KafkaDeserializationSchemaWrapper<T> implements KafkaRecordDeserializer<T> {
+	private static final long serialVersionUID = 3239435655135705790L;

Review comment:
       Per the style guide, please start the serialVersionUID at 1L.
   https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.reader.deserializer;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.IOException;
+
+/**
+ * A class that wraps a {@link DeserializationSchema} as the value deserializer for a
+ * {@link ConsumerRecord}.
+ *
+ * @param <T> the return type of the deserialization.
+ */
+class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializer<T> {
+	private static final long serialVersionUID = -3962448817248263667L;

Review comment:
       `serialVersionUID` should start at 1L, see above.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java
##########
@@ -18,45 +18,59 @@
 
 package org.apache.flink.connector.kafka.source.reader.deserializer;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
  * A package private class to wrap {@link Deserializer}.
  */
-class ValueDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
+class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
 	private static final long serialVersionUID = 5409547407386004054L;
-	private static final Logger LOG = LoggerFactory.getLogger(ValueDeserializerWrapper.class);
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaValueOnlyDeserializerWrapper.class);
 	private final String deserializerClass;
 	private final Map<String, String> config;
+	private ClassLoader userCodeClassLoader;
 
 	private transient Deserializer<T> deserializer;
 
-	ValueDeserializerWrapper(
+	KafkaValueOnlyDeserializerWrapper(
 			Class<? extends Deserializer<T>> deserializerClass,
 			Map<String, String> config) {
 		this.deserializerClass = deserializerClass.getName();
 		this.config = config;
 	}
 
+	@Override
+	public void open(DeserializationSchema.InitializationContext context) throws Exception {
+		this.userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
-	public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> collector) throws Exception {
+	public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> collector) throws IOException {
 		if (deserializer == null) {
-			deserializer = (Deserializer<T>) InstantiationUtil.instantiate(
+			try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) {

Review comment:
       Could this initialization be moved into the open method?
   If not, could be good to move the init logic to separate method, to keep the main method code paths small, for JIT friendlyness. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java
##########
@@ -18,45 +18,59 @@
 
 package org.apache.flink.connector.kafka.source.reader.deserializer;
 
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
  * A package private class to wrap {@link Deserializer}.
  */
-class ValueDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
+class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
 	private static final long serialVersionUID = 5409547407386004054L;

Review comment:
       `serialVersionUID` should start at 1L, see above.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -191,6 +194,11 @@ public void sendSplitRequest() {
 			public void sendSourceEventToCoordinator(SourceEvent event) {
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
+
+			@Override
+			public UserCodeClassLoader getUserCodeClassLoader() {
+				return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();

Review comment:
       You can probably reuse some parts from `RuntimeContextInitializationContextAdapters` here.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -191,6 +194,11 @@ public void sendSplitRequest() {
 			public void sendSourceEventToCoordinator(SourceEvent event) {
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
+
+			@Override
+			public UserCodeClassLoader getUserCodeClassLoader() {
+				return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();

Review comment:
       This looks like a cast between incompatible types.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -197,7 +195,20 @@ public void sendSourceEventToCoordinator(SourceEvent event) {
 
 			@Override
 			public UserCodeClassLoader getUserCodeClassLoader() {
-				return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
+				return new UserCodeClassLoader() {

Review comment:
       Is it possible to reuse code from `RuntimeContextInitializationContextAdapters` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-739324003


   I think you are right. At the high level, the performance would be the best all the CPU cores a busy and they do not do unnecessary work. 
   
   In the ideal case, there are N dedicated main threads, where N == number of CPU cores, so no computing resource is idle. These main threads will only be "interrupted" by IO, which means there are more records to be handed over to the main threads for processing. Async IO would be beneficial so that can be done in the main thread without interruption or context switch at all. We can achieve this in KafkaSource because `KafkaConsumer` is designed to be non-blocking.
   
   The only potential problem I can think of is the overhead of increasing the parallelism. e.g. more memory footprint, more IO buffer, etc.
   
   And I think you are also right about the assumption based on which more deserialization threads works. For most streaming systems in production, actually there are spare CPU resources. And increasing the parallelism is usually done by adding a new JVM instance which could be expensive. So adding more deserialization thread helps.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738051546


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2d0641fba0c2f3f36765fe80af872c2815875976 (Fri May 28 08:59:08 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20379).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10551",
       "triggerID" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e538857730e35f02d17631d79df453453874c6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516) 
   * 2d0641fba0c2f3f36765fe80af872c2815875976 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10551) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10551",
       "triggerID" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d0641fba0c2f3f36765fe80af872c2815875976 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10551) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0972e9422c2f8c767d1e855c3f234d835e41430b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494) 
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0972e9422c2f8c767d1e855c3f234d835e41430b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494) 
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0972e9422c2f8c767d1e855c3f234d835e41430b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738820798


   On the "Move Deserialization From SplitReader to RecordEmitter" discussion:
   
   I would be interested in seeing how this impacts performance. The current Kafka connector runs deserialization in the processing thread, so we could use this as a baseline for performance comparison.
   
   Right now, the new Kafka connector does strictly more work (in CPU cycles) with its wrapping and re-packaging of records, compared to the old connector, or an approach that deserializes in the RecordEmitter.
   The assumption that this is faster can only hold if the actual processing is not sufficiently parallelized (there are unused cores, so it is better to do something less efficient where more threads can be used).
   
   To my knowledge, any setup with sufficient parallelization across partitions wins over setups which try to parallelize across pipeline stages: Data parallelism always beats pipeline parallelism on modern hardware. So if you have spare CPU cores, increasing the parallelism of the operator (and slots per TM) is more efficient than trying to spread work across more threads and stages. Otherwise, it would not be a good idea to chain operators into tasks, but we should have a thread per operator.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on a change in pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
becketqin commented on a change in pull request #14303:
URL: https://github.com/apache/flink/pull/14303#discussion_r535752205



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -191,6 +194,11 @@ public void sendSplitRequest() {
 			public void sendSourceEventToCoordinator(SourceEvent event) {
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
+
+			@Override
+			public UserCodeClassLoader getUserCodeClassLoader() {
+				return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();

Review comment:
       This was fixed in a later commit. The change should be put here. I'll rebase the commits.
   
   I did look into `RuntimeContextInitializationContextAdapters`. Please see the other reply for the reason why it was not used here.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -197,7 +195,20 @@ public void sendSourceEventToCoordinator(SourceEvent event) {
 
 			@Override
 			public UserCodeClassLoader getUserCodeClassLoader() {
-				return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
+				return new UserCodeClassLoader() {

Review comment:
       That was my first attempt as well. But it seems a little ugly for two reasons:
   1. The `RuntimeContextInitializationContextAdapters` is itself a class converting `RuntimeContext` to `InitializationContext`. The scope and usage of that class is quite clear. The `RuntimeContextUserCodeClassLoaderAdapter` is a nested private class. Making only that class public seems a little weird.
   2. The `RuntimeContextInitializationContextAdapters` is affiliated with `DeserializationSchema` which is something that the SourceOperator should not be aware of.
   
   Another 2 options are:
   1. Move the `RuntimeContextUserCodeClassLoaderAdapter` to a separate class.
   2. Add a new method to `RuntimeContext` which returns an actual `UserCodeClassLoader` instead of letting the caller construct by themselves.
   
   Both approach seems a little unnecessary here because the inline class is quite simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738051546


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 0972e9422c2f8c767d1e855c3f234d835e41430b (Thu Dec 03 15:01:45 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20379).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2d0641fba0c2f3f36765fe80af872c2815875976",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e538857730e35f02d17631d79df453453874c6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10516) 
   * 2d0641fba0c2f3f36765fe80af872c2815875976 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0972e9422c2f8c767d1e855c3f234d835e41430b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14303: [FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14303:
URL: https://github.com/apache/flink/pull/14303#issuecomment-738079106


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10494",
       "triggerID" : "0972e9422c2f8c767d1e855c3f234d835e41430b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500",
       "triggerID" : "d78c1394221de2af440a7a4d3e0b500d1d6f188a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e538857730e35f02d17631d79df453453874c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "07e538857730e35f02d17631d79df453453874c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d78c1394221de2af440a7a4d3e0b500d1d6f188a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=10500) 
   * 07e538857730e35f02d17631d79df453453874c6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org