You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by haohui <gi...@git.apache.org> on 2017/02/14 23:29:47 UTC

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

GitHub user haohui opened a pull request:

    https://github.com/apache/flink/pull/3314

    [FLINK-3679] DeserializationSchema should handle zero or more outputs

    This PR adds a new interface, `RichKeyedDeserializationSchema`, to enable the deserializer to produce zero or more outputs. The main use case is that skipping corrupted messages in the Kafka stream.
    
    Feedbacks (especially on backward compatibility) are highly appreciated.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haohui/flink FLINK-3679

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3314
    
----
commit 7728acb3bc00a12a7552706be569710fbfdbd200
Author: Haohui Mai <wh...@apache.org>
Date:   2017-02-14T22:19:29Z

    [FLINK-3679] DeserializationSchema should handle zero or more outputs for every input.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102881264
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102048656
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
     				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
     
     				// get the records for each topic partition
    -				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +				for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     
     					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
     							records.records(partition.getKafkaPartitionHandle());
     
    -					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    -						final T value = deserializer.deserialize(
    -								record.key(), record.value(),
    -								record.topic(), record.partition(), record.offset());
    -
    -						if (deserializer.isEndOfStream(value)) {
    -							// end of stream signaled
    -							running = false;
    -							break;
    -						}
    -
    -						// emit the actual record. this also updates offset state atomically
    -						// and deals with timestamps and watermark generation
    -						emitRecord(value, partition, record.offset(), record);
    +					for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    +						final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    Same question as in the Kafka 0.8 impl


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102901123
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---
    @@ -171,8 +171,9 @@ private static String getResourceFilename(String filename) {
     		private final List<KafkaTopicPartition> partitions;
     
     		@SuppressWarnings("unchecked")
    -		DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
    -			super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
    +		DummyFlinkKafkaConsumer(
    +				List<KafkaTopicPartition> partitions) {
    --- End diff --
    
    If its just one parameter, I don't think we need a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102881632
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code.
    This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer.
    
    I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together.
    
    Is that ok for you? Sorry for some more waiting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102899179
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
    + * {@see KeyedSerializationSchema}
    + *
    + * @param <T> The type created by the keyed deserialization schema.
    + */
    +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    +	/**
    +	 * Deserializes the byte message.
    +	 *
    +	 * @param messageKey the key as a byte array (null if no key has been set)
    +	 * @param message The message, as a byte array. (null if the message was empty or deleted)
    +	 * @param partition The partition the message has originated from
    +	 * @param offset the offset of the message in the original source (for example the Kafka offset)
    +	 *
    +	 * @return The deserialized message as an object.
    +	 */
    +	void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset,
    +						Collector<T> collector) throws IOException;
    --- End diff --
    
    The indentation of the parameters here seems a bit off.
    Now with the number of parameters to be quite lengthy, it might be a good style to have one parameter per line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102665687
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    Moving the discussion back a bit:
    
    I don't think this implementation works correctly with exactly-once and how we checkpoint the consumer's partition offset state.
    
    The problem is that, in `emitRecord`, we will be updating the offset state. In the changes here, what this means is that we will be considering a record to have been fully processed as soon as the collector collects something.
    
    For example, lets say the serializer will call `collect` 3 times for elements deserialized from record R before `deserialize` returns. R has offset 100L. As soon as the first element is collected, the state will be updated to `finished processing offset 100L`. If now checkpointing is triggered, and we use that checkpoint to restore, we will be skipping the remaining 2 elements that were yet to be collected.
    Once 
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102542018
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    @StephanEwen What is your opinion on solving this problem?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102896783
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri
     	 * @param props
     	 *           The properties that are used to configure both the fetcher and the offset handler.
     	 */
    -	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
    +	public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
    --- End diff --
    
    This will break user-code. We'll need proper usage migration here.
    
    We have a separate JIRA that aims at deprecating the current Kafka Consumer constructors: https://issues.apache.org/jira/browse/FLINK-5704. The migration to use the new flat-map deserialzer can be included there.
    
    Perhaps for this PR, we should just use your `RichKeyedDeserializationSchemaWrapper` as "behaviour bridges" for the original deserialization schema to the new one, and don't change the original constructor / include new constructors yet, so that we don't overlap and complicate things for FLINK-5704. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r103339489
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    --- End diff --
    
    Can you please suggest where it should be put?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102898307
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -422,6 +429,99 @@ public void run() {
     		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
     	}
     
    +	@Test
    +	public void testRichDeserializationSchema() throws Exception {
    +		final String topic = "test-topic";
    +		final int partition = 3;
    +		final byte[] payload = new byte[] {1, 2, 3, 4};
    +		final byte[] endPayload = "end".getBytes(StandardCharsets.UTF_8);
    +
    +		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
    +			new ConsumerRecord<>(topic, partition, 15, payload, payload),
    +			new ConsumerRecord<>(topic, partition, 16, payload, payload),
    +			new ConsumerRecord<>(topic, partition, 17, payload, endPayload));
    +
    +		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
    +		data.put(new TopicPartition(topic, partition), records);
    +
    +		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
    +
    +		// ----- the test consumer -----
    +
    +		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
    +				return consumerRecords;
    +			}
    +		});
    +
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- build a fetcher -----
    +
    +		ArrayList<String> results = new ArrayList<>();
    +		SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
    +		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
    +		RichKeyedDeserializationSchema<String> schema = new RichKeyedDeserializationSchema<String>() {
    +			@Override
    +			public void deserialize(
    +				byte[] messageKey, byte[] message, String topic, int partition,
    +				long offset, Collector<String> collector) throws IOException {
    +				if (offset != 16) {
    +					collector.collect(new String(message));
    +				}
    +			}
    +
    +			@Override
    +			public boolean isEndOfStream(String nextElement) {
    +				return nextElement.equals("end");
    +			}
    +
    +			@Override
    +			public TypeInformation<String> getProducedType() {
    +				return BasicTypeInfo.STRING_TYPE_INFO;
    +			}
    +		};
    +
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +			sourceContext,
    +			topics,
    +			null, /* no restored state */
    +			null, /* periodic watermark extractor */
    +			null, /* punctuated watermark extractor */
    +			new TestProcessingTimeService(),
    +			10, /* watermark interval */
    +			this.getClass().getClassLoader(),
    +			false, /* checkpointing */
    +			"task_name",
    +			new UnregisteredMetricsGroup(),
    +			schema,
    +			new Properties(),
    +			0L,
    +			StartupMode.GROUP_OFFSETS,
    +			false);
    +
    +
    +		// ----- run the fetcher -----
    +
    +		final AtomicReference<Throwable> error = new AtomicReference<>();
    +		final Thread fetcherRunner = new Thread("fetcher runner") {
    --- End diff --
    
    We have a nice utility `CheckedThread` that serves for the tested purpose here (catching errors and storing its reference).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    Yes totally agree. Thanks very much for taking the time to review the PRs. Will do it next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r104312079
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -419,6 +424,164 @@ public void run() {
     		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
     	}
     
    +	@Test
    +	public void testSkipCorruptedMessage() throws Exception {
    +
    +		// ----- some test data -----
    +
    +		final String topic = "test-topic";
    +		final int partition = 3;
    +		final byte[] payload = new byte[] {1, 2, 3, 4};
    +
    +		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
    +			new ConsumerRecord<>(topic, partition, 15, payload, payload),
    +			new ConsumerRecord<>(topic, partition, 16, payload, payload),
    +			new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
    +
    +		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
    +		data.put(new TopicPartition(topic, partition), records);
    +
    +		final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
    +
    +		// ----- the test consumer -----
    +
    +		final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
    +		when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
    +			@Override
    +			public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
    +				return consumerRecords;
    +			}
    +		});
    +
    +		whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +		// ----- build a fetcher -----
    +
    +		ArrayList<String> results = new ArrayList<>();
    +		SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
    +		Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +			Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() {
    +
    +			@Override
    +			public String deserialize(byte[] messageKey, byte[] message,
    +									  String topic, int partition, long offset) throws IOException {
    +				return offset == 15 ? null : new String(message);
    +			}
    +
    +			@Override
    +			public boolean isEndOfStream(String nextElement) {
    +				return "end".equals(nextElement);
    +			}
    +
    +			@Override
    +			public TypeInformation<String> getProducedType() {
    +				return BasicTypeInfo.STRING_TYPE_INFO;
    +			}
    +		};
    +
    +		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +			sourceContext,
    +			partitionsWithInitialOffsets,
    +			null, /* periodic watermark extractor */
    +			null, /* punctuated watermark extractor */
    +			new TestProcessingTimeService(),
    +			10, /* watermark interval */
    +			this.getClass().getClassLoader(),
    +			true, /* checkpointing */
    +			"task_name",
    +			new UnregisteredMetricsGroup(),
    +			schema,
    +			new Properties(),
    +			0L,
    +			false);
    +
    +
    +		// ----- run the fetcher -----
    +
    +		fetcher.runFetchLoop();
    +		assertEquals(1, results.size());
    +	}
    +
    +	@Test
    +	public void testNullAsEOF() throws Exception {
    --- End diff --
    
    I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested.
    
    We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    Thanks for the comments. Allowing `DeserializationSchema` to return `null` sounds good to me. I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    LGTM, I'll proceed to merge this later today.
    One minor problem: the offset state still isn't updated if `record == null`. We need to do the checking in the synchronize block in the `emitRecord*` methods.
    
    It's a simple fix, so I can do it while merging :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102881092
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    @haohui hmm this seems a bit odd to me. I think it should be achievable.
    
    ```
    // the buffer; this can be shared
    final List<T> bufferedElements = new LinkedList<>();
    // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared
    final BufferCollector collector = new BufferCollector<T>(bufferedElements);
    
    ...
    
    for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) {
        deserializer.deserialize(
            record.key(), record.value(), record.topic(),
            record.partition(), record.offset(), collector);
    
        emitRecords(bufferedElements, partitionState, record.offset(), record);
    
        bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer
    }
    ```
    
    Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102898901
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
    + * {@see KeyedSerializationSchema}
    --- End diff --
    
    I'm not sure why we need to link to `KeyedSerializationSchema` in the Javadocs for the new serialization schema.
    From what I know, we're going to completely replace it, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102901299
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---
    @@ -1236,10 +1237,11 @@ public Tuple2WithTopicSchema(ExecutionConfig ec) {
     		}
     
     		@Override
    -		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
    +		public void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset,
    +								Collector<Tuple3<Integer, Integer, String>> collector) throws IOException {
    --- End diff --
    
    Same here: the indentation formatting seems off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102898788
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
    --- End diff --
    
    The name of the class is `RichKeyedDeserializationSchema `, but in the Javadocs it mentions `RichDeserializationSchema `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    The change looks good to merge in my opinion.
    @tzulitai can you also have a quick look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    Thanks a lot for your understanding @haohui. 
    Let us know once you've updated the PR so that we can review and merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102830609
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    Good catch, @tzulitai !
    
    I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself -- The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself -- it cannot solve the problem of having a collector per record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    @haohui - one suggestion for future contributions for easier reviews:
    We usually use follow-up commits that addresses review comments, instead of force pushing the whole branch. For reviewers, this allows easier tracking of history of what has been addressed and fixed from the original PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r104227652
  
    --- Diff: docs/dev/connectors/kafka.md ---
    @@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into
     `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
     method gets called for each Kafka message, passing the value from Kafka.
     
    +There are two possible design choice when the `DeserializationSchema` encounters a corrupted message. It can
    --- End diff --
    
    choices


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    Thank you for opening a pull request.
    I think the change is missing an update to the documentation. I did a very very superficial review of the change :) This needs a more thorough check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102900986
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---
    @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
     				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
     
     				// get the records for each topic partition
    -				for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
    +				for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
     
     					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
     							records.records(partition.getKafkaPartitionHandle());
     
    -					for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    -						final T value = deserializer.deserialize(
    -								record.key(), record.value(),
    -								record.topic(), record.partition(), record.offset());
    -
    -						if (deserializer.isEndOfStream(value)) {
    -							// end of stream signaled
    -							running = false;
    -							break;
    -						}
    -
    -						// emit the actual record. this also updates offset state atomically
    -						// and deals with timestamps and watermark generation
    -						emitRecord(value, partition, record.offset(), record);
    +					for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    +						final Collector<T> collector = new Collector<T>() {
    +							@Override
    +							public void collect(T value) {
    +								if (deserializer.isEndOfStream(value)) {
    +									// end of stream signaled
    +									running = false;
    +								} else {
    +									// emit the actual record. this also updates offset state atomically
    +									// and deals with timestamps and watermark generation
    +									try {
    +										emitRecord(value, partition, record.offset(), record);
    +									} catch (Exception e) {
    +										throw new RuntimeException(e);
    +									}
    +								}
    +							}
    +
    +							@Override
    +							public void close() {
    +
    +							}
    +						};
    +
    +						deserializer.deserialize(
    +							record.key(), record.value(),
    +							record.topic(), record.partition(), record.offset(), collector);
    --- End diff --
    
    The formatting for the list of arguments here could be nicer. Perhaps one argument per line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102048475
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    I'm not sure of the performance implications for this. The JVM will create a Collector instance for each record read from Kafka.
    I wonder if we can re-use one collector instance here.
    
    
    Also, I wonder if we need to use this `Collector` implementation, with a `close()` method we are not using and an exception we are turning into a `RuntimeException`. Maybe we should let the collect throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102896891
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---
    @@ -121,7 +122,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser
     	 * @param props
     	 *           The properties that are used to configure both the fetcher and the offset handler.
     	 */
    -	public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
    +	public FlinkKafkaConsumer010(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
    --- End diff --
    
    Same as in the comment in `FlinkKafkaConsumer08`: this breaks user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102900064
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    --- End diff --
    
    Since this is now a very Kafka-specific class, I think this is good timing to change to the package path `org.apache.flink.streaming.kafka.serialization` now.
    
    The original `KeyedDeserializationSchema` was placed under `o.a.f.s.util.serialization` because it was wrongly packaged in another module before, and moved to `flink-connector-kafka-base` under the same package path to avoid breaking user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r103399810
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    --- End diff --
    
    I would put it perhaps in `org.apache.flink.streaming.kafka.serialization` under `flink-connector-kafka-base`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    CI failed in one of the group as the group was timed out. The specific group was not timed out in the last run.
    
    @tzulitai can you please take another look? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102299038
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    Totally agree. Playing around a little bit and it might require some trade-offs here.
    
    The problem is that `emitRecord()` needs the state for each records (e.g., topic partition, offset, etc.). The state can be either passed inside a closure (like the new instance for the `Collector`) or passed through arguments. I see there are three possibilities here:
    
    1. Create a new instance of `Collector` for every record. The JVM may or may not be able to optimize it. Trace-based JVM should be able to but I'm not sure about classed-based JVM.
    
    2. Expose the internal state in the `collect()` call. The `collect()` call takes additional parameters such as offset and partition state. It reduces the GC overheads but also hinders changing the implementation.
    
    3. Create a new interface like `Optional<T> deserialize(byte[] messageKey, ...)` (or
    `void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to optimize away the cost of the `Optional` class). It results in a slightly more complex APIs but it probably has the best trade-offs between performances and API compatibility.
    
    What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    @StephanEwen and I just had an offline discussion about the change, and we came up with the following thoughts:
    
    Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, because it is not a robust solution. Users could theoretically run into the size limit of an array list, and unnesting large messages (in multiple threads in the Kafka 0.8 case) can put pressure on the GC. We think that we should try to avoid that approach if possible.
    
    Alternative approaches we considered (ordered by preference):
    - Define the DeserializationSchema so that users can return `null` if the user doesn't want to emit a record.
    This approach would not change the current approach, and is pretty minimal. Of course, it would not allow for the "unnesting" use case, where you want to emit multiple records from one Kafka message. Users would need to deserialize into a nested structure and use a flatMap afterwards to do the un-nesting.
    - Move the deserialization into the checkpoint lock. This would allow us to collect elements into our internal collector from the user collector while still preserving exactly once semantics.
    This change would probably be a bit more involved code-wise, as we need to rearrange some parts (maybe moving the deserialization schema instance into the emitRecord() method, change of some method signatures).
    A downside of this approach would be that the Kafka 0.8 consumer threads would deserialize records in a sequential order (since only one consumer thread can hold the lock at a time). For Kafka 0.9 this is already the case. I think we can live with that, because the majority of users moved away from kafka 0.8 by now.
    - Use the `ArrayList` approach. Users would potentially run into issues and we would loose some of Flink's robustness.
    
    @jgrier since you've opened the original JIRA back then, what's your take on the discussion? How bad would it be for users to just allow the `null` or record approach? (Other opinions are of course also appreciated)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102900820
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---
    @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri
     	 * @param props
     	 *           The properties that are used to configure both the fetcher and the offset handler.
     	 */
    -	public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
    +	public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
     		super(topics, deserializer);
    --- End diff --
    
    So, instead of changing the constructor, we should still do
    `super(topics, new RickKeyedDeserializationSchemaWrapper(deserializer))`
    here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by jgrier <gi...@git.apache.org>.
Github user jgrier commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    I think it would be just fine if we allowed a null return given the tradeoffs discussed here.  The main thing was to allow users a way to deal with bad data with minimal effort and without throwing an exception and causing their job to restart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    @rmetzger ping...
    just wondering what do you think about all the approaches we have discussed here? Your comments are appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102897911
  
    --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---
    @@ -422,6 +429,99 @@ public void run() {
     		assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking());
     	}
     
    +	@Test
    +	public void testRichDeserializationSchema() throws Exception {
    --- End diff --
    
    I think we should enhance this test to test the behaviour with multiple `collect`s per record also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by InfinitiesLoop <gi...@git.apache.org>.
Github user InfinitiesLoop commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    Sorry for necro'ing this thread, but where does the community land on the multiple record per kafka payload idea this PR originally intended to solve?
    
    I have this scenario, where a single payload in kafka can represent hundreds of logical records. It's fine to just flatMap() them out after the deserialization schema, but that does not let me deal with timestamps and watermarks correctly. It's possible the source is reading from 2 partitions that are out of sync with each other, but I can't assign a timestamp and watermark for a single message that contains many records that might span multiple timestamps. So I'm just using a timestamp and watermark extractor on the stream separate from the source, and just hoping that I never have out of sync partitions. If a solution is still desired I'd love to contribute, otherwise it looks like I will end up having to write my own custom kafka source..


---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102900238
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchemaWrapper.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +
    +public class RichKeyedDeserializationSchemaWrapper<T> implements RichKeyedDeserializationSchema<T> {
    --- End diff --
    
    Can you also include Javadocs for this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102668004
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -373,16 +370,28 @@ else if (partitionsRemoved) {
     								keyPayload.get(keyBytes);
     							}
     
    -							final T value = deserializer.deserialize(keyBytes, valueBytes, 
    -									currentPartition.getTopic(), currentPartition.getPartition(), offset);
    -							
    -							if (deserializer.isEndOfStream(value)) {
    -								// remove partition from subscribed partitions.
    -								partitionsIterator.remove();
    -								continue partitionsLoop;
    -							}
    -							
    -							owner.emitRecord(value, currentPartition, offset);
    +							final Collector<T> collector = new Collector<T>() {
    --- End diff --
    
    What I think we should do to solve this correctly:
    
    Buffer the elements collected from the `deserialize` call. The `Collector.collect` implementation should simply add the collected element to the buffer, and not emit it immediately.
    
    After `deserialize` returns, call `emitRecord` once with all the elements in the buffer and the original record's offset. This, of course, would mean we need to slightly change the `emitRecord` implementation a bit to something like:
    ```
    void emitRecord(List<T> records, KafkaTopicPartitionState<KPH> partitionState, long offset) {
        synchronized (checkpointLock) {
            for (T record : records) {
                sourceContext.collect(record);
            }
            partitionState.setOffset(offset);
        }
    }
    ```
    
    After this, we proceed with the next record and repeat. Note that the emitting of all produced elements from record at offset 100L and the update to the offset state to 100L happens atomically synchronized on the checkpoint lock,  so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 100, and not in-between.
    
    I think we should also be able to avoid a per-record `Collector` with this solution. We can reuse a `Collector` and provide it to the `deserializer` for every record, because it's simply only a means to collect elements to the internal buffer and we're not calling `emitRecords` in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r104311996
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---
    @@ -381,6 +381,10 @@ else if (partitionsRemoved) {
     								partitionsIterator.remove();
     								continue partitionsLoop;
     							}
    +
    +							if (value == null) {
    +								continue;
    +							}
    --- End diff --
    
    Would it make sense to do the `null` checking inside `emitRecord(...)`?
    Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed".
    
    I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3314
  
    I've made some final general improvements in https://github.com/tzulitai/flink/tree/PR-FLINK-3679.
    
    Doing a Travis run before merging:
    https://travis-ci.org/tzulitai/flink/builds/209054624


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r102902685
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---
    @@ -119,7 +119,7 @@ public Void answer(InvocationOnMock invocation) {
             @SuppressWarnings("unchecked")
             SourceContext<String> sourceContext = mock(SourceContext.class);
             List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
    -        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    +        RichKeyedDeserializationSchemaWrapper<String> schema = new RichKeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
    --- End diff --
    
    This file will have conflict with the current `master`, because I recently pushed a hotfix to `master` to fix the indentation of this file (previously, it's incorrectly using spaces to indent instead of tabs). Sorry about this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3314


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r103405663
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.util.serialization;
    +
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +
    +/**
    + * RichKeyedDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
    + * {@see KeyedSerializationSchema}
    + *
    + * @param <T> The type created by the keyed deserialization schema.
    + */
    +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    +	/**
    +	 * Deserializes the byte message.
    +	 *
    +	 * @param messageKey the key as a byte array (null if no key has been set)
    +	 * @param message The message, as a byte array. (null if the message was empty or deleted)
    +	 * @param partition The partition the message has originated from
    +	 * @param offset the offset of the message in the original source (for example the Kafka offset)
    +	 * @param collector the user-provided collector that deserializes the bytes into zero or more
    +	 *                  records.
    +	 *
    +	 * @return The deserialized message as an object.
    --- End diff --
    
    The method doesn't return anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---