You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by FredTing <gi...@git.apache.org> on 2018/05/07 05:35:22 UTC

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

GitHub user FredTing opened a pull request:

    https://github.com/apache/flink/pull/5958

    [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

    ## What is the purpose of the change
    
    This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing.
    
    ## Brief change log
    
    Introduced new interface `ConsumerRecordMetaInfo` with meta info of the kafka message
    Extended the `DeserializationSchema` with the `T deserialize(ConsumerRecordMetaInfo consumerRecord)` method.
    Adjusted the Kafka Connectors to support the new interface.
    Added some documentation.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as most of the Kafka Consumer tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
      - The serializers: (**yes** / no / don't know)
      - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5958
    
----
commit dca3e7bf5504fdeb929cb38bdd05c3fcec184d6c
Author: Fred Teunissen <fr...@...>
Date:   2018-05-06T15:31:15Z

    [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

----


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186338721
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
    +
     	/**
     	 * Deserializes the byte message.
     	 *
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 *
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
     	 */
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException {
    --- End diff --
    
    I'm actually not sure that we should continue using this class, for the following reasons:
    
    1. The class is actually placed under a non-ideal package:
    `o.a.f.api.common.serialization`, whereas is should be placed under some `o.a.f.connectors.kafka....`.
    The reason it is currently placed under this package was because the `DeserializationSchema` was initially intended to be commonly used by all connectors. However, over time, things have proven that each connector will benefit from their own version of a schema class.
    
    So, it actually might make sense to deprecate the whole `DeserializationSchema` class now, and have a new class (maybe called `KafkaDeserializationSchema` / `KafkaSerializationSchema`) under a correct Kafka package.
    
    What do you think?


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186606104
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
    +
     	/**
     	 * Deserializes the byte message.
     	 *
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 *
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
     	 */
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException {
    --- End diff --
    
    Makes sense. Alright, lets leave this as is then.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186338049
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    --- End diff --
    
    For the deprecation, I would recommend explaining why the new deserialize method is more superior.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186338002
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    --- End diff --
    
    Unnecessary space before period at the end.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186337301
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---
    @@ -78,6 +79,69 @@ public Kafka010Fetcher(
     				useMetrics);
     	}
     
    +	private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo {
    +		private static final long serialVersionUID = 2651665280744549935L;
    +
    +		private final ConsumerRecord<byte[], byte[]> consumerRecord;
    +
    +		public KafkaConsumerRecordWrapper10(ConsumerRecord<byte[], byte[]> consumerRecord) {
    +			this.consumerRecord = consumerRecord;
    +		}
    +
    +		@Override
    +		public byte[] getKey() {
    +			return consumerRecord.key();
    +		}
    +
    +		@Override
    +		public byte[] getMessage() {
    +			return consumerRecord.value();
    +		}
    +
    +		@Override
    +		public String getTopic() {
    +			return consumerRecord.topic();
    +		}
    +
    +		@Override
    +		public int getPartition() {
    +			return consumerRecord.partition();
    +		}
    +
    +		@Override
    +		public long getOffset() {
    +			return consumerRecord.offset();
    +		}
    +
    +		@Override
    +		public long getTimestamp() {
    +			return Long.MIN_VALUE;
    --- End diff --
    
    Doesn't Kafka 0.10 support record timestamps?


---

[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    If it turns out that we need to do a bit more design work on the deserialization schema, we could incrementally fix the issue that triggered this PR by actually extending the KeyedDeserializationSchema with a new default method.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186337890
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.serialization;
    +
    +import org.apache.flink.annotation.Public;
    +
    +/**
    + * The consumer record meta info contains, besides the actual message, some meta information, such as
    + * key, topic, partition, offset and timestamp for Apache kafka
    + *
    + * <p><b>Note:</b>The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and
    + * the timestampType has the value `NO_TIMESTAMP_TYPE`.
    + */
    +@Public
    +public interface ConsumerRecordMetaInfo {
    +	/**
    +	 * The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9
    +	 * so a local enumeration is needed.
    +	 */
    +	enum TimestampType {
    +		NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
    --- End diff --
    
    `NO_TIMESTAMP_TYPE` --> maybe `NO_TIMESTAMP` will do, since from the enum name we already know it is a type.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing closed the pull request at:

    https://github.com/apache/flink/pull/5958


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186967673
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
    +
     	/**
     	 * Deserializes the byte message.
     	 *
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 *
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
     	 */
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException {
    --- End diff --
    
    I would also vote for deprecating those classes and creating a specific version `KafkaDeserializationSchema`/`KafkaSerializationSchema`. I would also like to add a corresponding option to `SerializationSchema` to pass the targetTopic down, e.g. to be able to lookup appropriate schema in SchemaRegistry.
    
    I think changes like those does not fit well into a common space.


---

[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    There are a few bigger design aspects that we need to agree upon:
    
      - The `DeserializationSchema` is a shared common denominator of serialization schemata. That's why it is in `flink-core` and not in a Kafka connector project. It is used by various per-record streaming sources, like Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate Kinesis also to that. This PR changes the common denominator to have very Kafka-specific fields.
    
      - The common schema makes sense because we can offer a library of implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use hardened implementations for these types, or integrations with schema registries.
    
      - This surfaces for example in the SQL / Table API, which is currently making an effort to change their source API to have "connector" and "format" aspects orthogonal. You define a table as "from kafka with Avro", or "from row-wise file with JSON", etc.
    
      - We should think of having something similar in the future in the unified batch/streaming DataStream API as well, when we rework our source interface. At least a "row-wise source" that can then use all these format implementations.
    
    That means we are in a bit of a conflict between "common deserialization schema" interface and surfacing connector specific information.
    One way to approach that might be making the connector-specific deserializer classes subclasses of the common one, and let them use specialized subclasses of ConsumerRecordMetaInfo that have the additional fields.
    
    On a separate note, I think that `ConsumerRecordMetaInfo` is not the best name, because the type has not only the meta info, but the actual record. So we could call it `Record` or `Datum` or `SourceRecord`, etc.


---

[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    @FredTing we had some offline discussion on how to proceed with this.
    @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything.
    
    The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface.
    
    Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific.
    
    Therefore, we should perhaps break this up into two separate interfaces, as follows:
    ```
    // common interface for all sources (we already have this)
    interface DeserializationSchema<T> {
        T deserialize(byte[] bytes);
    }
    
    // ... and a Kafka-specific interface that is only used to provide record meta information
    interface ConsumerRecordMetaInfoProvider<T> {
        T enrich(T record, ConsumerRecordMetaInfo metaInfo);
    }
    ```
    
    The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion.
    
    What do you think?


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186337736
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.serialization;
    +
    +import org.apache.flink.annotation.Public;
    +
    +/**
    + * The consumer record meta info contains, besides the actual message, some meta information, such as
    + * key, topic, partition, offset and timestamp for Apache kafka
    + *
    + * <p><b>Note:</b>The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and
    + * the timestampType has the value `NO_TIMESTAMP_TYPE`.
    + */
    +@Public
    +public interface ConsumerRecordMetaInfo {
    +	/**
    +	 * The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9
    +	 * so a local enumeration is needed.
    +	 */
    +	enum TimestampType {
    +		NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
    --- End diff --
    
    I wonder if `CREATE_TIME` should be renamed as `EVENT_TIME`, to be more coherent with Flink's terminologies.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186465223
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.serialization;
    +
    +import org.apache.flink.annotation.Public;
    +
    +/**
    + * The consumer record meta info contains, besides the actual message, some meta information, such as
    + * key, topic, partition, offset and timestamp for Apache kafka
    + *
    + * <p><b>Note:</b>The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and
    + * the timestampType has the value `NO_TIMESTAMP_TYPE`.
    + */
    +@Public
    +public interface ConsumerRecordMetaInfo {
    +	/**
    +	 * The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9
    +	 * so a local enumeration is needed.
    +	 */
    +	enum TimestampType {
    +		NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
    --- End diff --
    
    I'll rename them both. 


---

[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    Closing this PR because new insights


---

[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    Thanks for the update @FredTing.
    I'll try to take another look at the PR within the next days.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186462871
  
    --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---
    @@ -78,6 +79,69 @@ public Kafka010Fetcher(
     				useMetrics);
     	}
     
    +	private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo {
    +		private static final long serialVersionUID = 2651665280744549935L;
    +
    +		private final ConsumerRecord<byte[], byte[]> consumerRecord;
    +
    +		public KafkaConsumerRecordWrapper10(ConsumerRecord<byte[], byte[]> consumerRecord) {
    +			this.consumerRecord = consumerRecord;
    +		}
    +
    +		@Override
    +		public byte[] getKey() {
    +			return consumerRecord.key();
    +		}
    +
    +		@Override
    +		public byte[] getMessage() {
    +			return consumerRecord.value();
    +		}
    +
    +		@Override
    +		public String getTopic() {
    +			return consumerRecord.topic();
    +		}
    +
    +		@Override
    +		public int getPartition() {
    +			return consumerRecord.partition();
    +		}
    +
    +		@Override
    +		public long getOffset() {
    +			return consumerRecord.offset();
    +		}
    +
    +		@Override
    +		public long getTimestamp() {
    +			return Long.MIN_VALUE;
    --- End diff --
    
    Yes, it certainly does, it should return the `consumerRecord.timestamp()`. I'll fix it.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186479026
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    +	 */
    +	@Deprecated
    +	T deserialize(byte[] message) throws IOException;
    +
     	/**
     	 * Deserializes the byte message.
     	 *
    -	 * @param message The message, as a byte array.
    +	 * @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}.
     	 *
     	 * @return The deserialized message as an object (null if the message cannot be deserialized).
     	 */
    -	T deserialize(byte[] message) throws IOException;
    +	default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException {
    --- End diff --
    
    I agree that it's probably better to make separate `DeserializationSchema` classes, for each connector type. But for now I think this is a relative easy fix without breaking the Flink API for the custom deserializers. There is already some discussion about redesigning the connectors (see issue 5479) with a `common connector framework` in mind. I think that would be a good place to decide what to do with a shared `DeserializationSchema`.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186479633
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java ---
    @@ -42,14 +42,22 @@
     @Public
     public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
     
    +	/**
    +	 * @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
    --- End diff --
    
    I added some more text to the javadoc explaining that implementing the other `deserialize` method has more benefits.


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by FredTing <gi...@git.apache.org>.
Github user FredTing commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186480680
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.serialization;
    +
    +import org.apache.flink.annotation.Public;
    +
    +/**
    + * The consumer record meta info contains, besides the actual message, some meta information, such as
    + * key, topic, partition, offset and timestamp for Apache kafka
    + *
    + * <p><b>Note:</b>The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and
    + * the timestampType has the value `NO_TIMESTAMP_TYPE`.
    + */
    +@Public
    +public interface ConsumerRecordMetaInfo {
    +	/**
    +	 * The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9
    +	 * so a local enumeration is needed.
    +	 */
    +	enum TimestampType {
    +		NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
    +	}
    +
    +	/**
    +	 * @return the key as a byte array (null if no key has been set).
    +	 */
    +	byte[] getKey();
    +
    +	/**
    +	 * @return The message, as a byte array (null if the message was empty or deleted).
    +	 */
    +	byte[] getMessage();
    +
    +	/**
    +	 * @return The topic the message has originated from (for example the Kafka topic).
    +	 */
    +	String getTopic();
    +
    +	/**
    +	 * @return The partition the message has originated from (for example the Kafka partition).
    +	 */
    +	int getPartition();
    +
    +	/**
    +	 * @return the offset of the message in the original source (for example the Kafka offset).
    +	 */
    +	long getOffset();
    +
    +	/**
    +	 * @return the timestamp of the consumer record
    --- End diff --
    
    I've added some more comments


---

[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5958#discussion_r186337834
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.serialization;
    +
    +import org.apache.flink.annotation.Public;
    +
    +/**
    + * The consumer record meta info contains, besides the actual message, some meta information, such as
    + * key, topic, partition, offset and timestamp for Apache kafka
    + *
    + * <p><b>Note:</b>The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and
    + * the timestampType has the value `NO_TIMESTAMP_TYPE`.
    + */
    +@Public
    +public interface ConsumerRecordMetaInfo {
    +	/**
    +	 * The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9
    +	 * so a local enumeration is needed.
    +	 */
    +	enum TimestampType {
    +		NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
    +	}
    +
    +	/**
    +	 * @return the key as a byte array (null if no key has been set).
    +	 */
    +	byte[] getKey();
    +
    +	/**
    +	 * @return The message, as a byte array (null if the message was empty or deleted).
    +	 */
    +	byte[] getMessage();
    +
    +	/**
    +	 * @return The topic the message has originated from (for example the Kafka topic).
    +	 */
    +	String getTopic();
    +
    +	/**
    +	 * @return The partition the message has originated from (for example the Kafka partition).
    +	 */
    +	int getPartition();
    +
    +	/**
    +	 * @return the offset of the message in the original source (for example the Kafka offset).
    +	 */
    +	long getOffset();
    +
    +	/**
    +	 * @return the timestamp of the consumer record
    --- End diff --
    
    Javadoc should educate the "dummy" timestamp value if timestamp type is `NO_TIMESTAMP`.


---