You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alex Fechner (JIRA)" <ji...@apache.org> on 2017/01/05 13:44:58 UTC

[jira] [Created] (KAFKA-4597) Record metadata returned by producer does'n consider log append time

Alex Fechner created KAFKA-4597:
-----------------------------------

             Summary: Record metadata returned by producer does'n consider log append time
                 Key: KAFKA-4597
                 URL: https://issues.apache.org/jira/browse/KAFKA-4597
             Project: Kafka
          Issue Type: Bug
          Components: clients, producer 
    Affects Versions: 0.10.1.1
            Reporter: Alex Fechner
            Priority: Minor


Kafka topics might be configured recording timestamps of the messages produced. There are two different timestamps which might be stored:

# Record *create time*: The time the record is created by the client.
# Log *append time*: The time the record has been added to the log by the broker.

The [ProducerRecord|https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html] docs state:

{quote}
In either of the cases above, the timestamp that has actually been used will be returned to user in RecordMetadata
{quote}

However I found the *create time* used in both cases.

The following class creates two topics, one configured to store *create time*. The other used *log append time*. It produces 10 messages in each topic and outputs the timestamps from the record meta data as well as those fetched by a consumer client.

{code:java}
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaTimestampTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String ip = "10.0.10.8";

        Properties producerProperties = new Properties();
        producerProperties.put("bootstrap.servers", ip + ":9092");
        producerProperties.put("acks", "all");
        producerProperties.put("retries", 0);
        producerProperties.put("batch.size", 16384);
        producerProperties.put("linger.ms", 1);
        producerProperties.put("buffer.memory", 33554432);
        producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Properties consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers", ip + ":9092");
        consumerProperties.put("enable.auto.commit", "false");
        consumerProperties.put("session.timeout.ms", "30000");
        consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Producer<String, String> producer = new KafkaProducer<>(producerProperties);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

        ZkClient zkClient = new ZkClient(ip + ":2181", 10000, 10000, ZKStringSerializer$.MODULE$);
        ZkConnection zkConnection = new ZkConnection(ip + ":2181");
        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

        TopicPartition topicPartitionWithCreateTime = new TopicPartition("test-topic-with-create-time", 0);
        TopicPartition topicPartitionWithLogAppendTime = new TopicPartition("test-topic-with-log-append-time", 0);

        // create topic with create time
        if (!AdminUtils.topicExists(zkUtils, topicPartitionWithCreateTime.topic())) {
            Properties topicProperties = new Properties();
            topicProperties.put("message.timestamp.type", "CreateTime");
            AdminUtils.createTopic(zkUtils, topicPartitionWithCreateTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$);
        }

        // create topic with log append time
        if (!AdminUtils.topicExists(zkUtils, topicPartitionWithLogAppendTime.topic())) {
            Properties topicProperties = new Properties();
            topicProperties.put("message.timestamp.type", "LogAppendTime");
            AdminUtils.createTopic(zkUtils, topicPartitionWithLogAppendTime.topic(), 1, 1, topicProperties, RackAwareMode.Disabled$.MODULE$);
        }

        consumer.assign(Arrays.asList(topicPartitionWithLogAppendTime, topicPartitionWithCreateTime));
        String format = "#%s, MetaDataOffset: %s, MetaDataTime: %s, ConsumerRecordOffset: %s, ConsumerRecordTime: %s";

        System.out.println(String.format("Create messages into topic %s ...", topicPartitionWithCreateTime));
        for (int i = 0; i < 10; i++) {
            RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithCreateTime.topic(), topicPartitionWithCreateTime.partition(), "", "message")).get();
            consumer.seek(topicPartitionWithCreateTime, recordMetadata.offset());
            ConsumerRecord<String, String> consumerRecord =  consumer.poll(1000).records(topicPartitionWithCreateTime).get(0);
            System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp()));
        }

        System.out.println(String.format("Create messages into topic %s...", topicPartitionWithLogAppendTime));
        for (int i = 0; i < 10; i++) {
            RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topicPartitionWithLogAppendTime.topic(), topicPartitionWithLogAppendTime.partition(), "", "message")).get();
            consumer.seek(topicPartitionWithLogAppendTime, recordMetadata.offset());
            ConsumerRecord<String, String> consumerRecord =  consumer.poll(1000).records(topicPartitionWithLogAppendTime).get(0);
            System.out.println(String.format(format, i + 1, recordMetadata.offset(), recordMetadata.timestamp(), consumerRecord.offset(), consumerRecord.timestamp()));
        }

        AdminUtils.deleteTopic(zkUtils, topicPartitionWithCreateTime.topic());
        AdminUtils.deleteTopic(zkUtils, topicPartitionWithLogAppendTime.topic());
    }
}
{code}

The output shows that in case of *log append time* the timestamps differ.

{code}
Create messages into topic test-topic-with-create-time-0 ...
#1, MetaDataOffset: 0, MetaDataTime: 1483623773788, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623773788
#2, MetaDataOffset: 1, MetaDataTime: 1483623774178, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774178
#3, MetaDataOffset: 2, MetaDataTime: 1483623774183, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623774183
#4, MetaDataOffset: 3, MetaDataTime: 1483623774188, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623774188
#5, MetaDataOffset: 4, MetaDataTime: 1483623774193, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623774193
#6, MetaDataOffset: 5, MetaDataTime: 1483623774197, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623774197
#7, MetaDataOffset: 6, MetaDataTime: 1483623774202, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623774202
#8, MetaDataOffset: 7, MetaDataTime: 1483623774207, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623774207
#9, MetaDataOffset: 8, MetaDataTime: 1483623774212, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623774212
#10, MetaDataOffset: 9, MetaDataTime: 1483623774217, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623774217
Create messages into topic test-topic-with-log-append-time-0...
#1, MetaDataOffset: 0, MetaDataTime: 1483623774224, ConsumerRecordOffset: 0, ConsumerRecordTime: 1483623774992
#2, MetaDataOffset: 1, MetaDataTime: 1483623774230, ConsumerRecordOffset: 1, ConsumerRecordTime: 1483623774997
#3, MetaDataOffset: 2, MetaDataTime: 1483623774235, ConsumerRecordOffset: 2, ConsumerRecordTime: 1483623775002
#4, MetaDataOffset: 3, MetaDataTime: 1483623774239, ConsumerRecordOffset: 3, ConsumerRecordTime: 1483623775007
#5, MetaDataOffset: 4, MetaDataTime: 1483623774244, ConsumerRecordOffset: 4, ConsumerRecordTime: 1483623775011
#6, MetaDataOffset: 5, MetaDataTime: 1483623774248, ConsumerRecordOffset: 5, ConsumerRecordTime: 1483623775015
#7, MetaDataOffset: 6, MetaDataTime: 1483623774253, ConsumerRecordOffset: 6, ConsumerRecordTime: 1483623775020
#8, MetaDataOffset: 7, MetaDataTime: 1483623774257, ConsumerRecordOffset: 7, ConsumerRecordTime: 1483623775024
#9, MetaDataOffset: 8, MetaDataTime: 1483623774262, ConsumerRecordOffset: 8, ConsumerRecordTime: 1483623775029
#10, MetaDataOffset: 9, MetaDataTime: 1483623774267, ConsumerRecordOffset: 9, ConsumerRecordTime: 1483623775034
{code}

 I assume the timestamps in the record meta data represent the create time but could not ensure that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)