You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/06/06 12:27:18 UTC

[jira] [Commented] (KAFKA-5390) First records in batch rejected but others accepted when rolling log

    [ https://issues.apache.org/jira/browse/KAFKA-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16038779#comment-16038779 ] 

Ismael Juma commented on KAFKA-5390:
------------------------------------

Thanks for the report. Pasting the code so that it's easier to reference. There's an issue in the code below. You are not passing a callback to `send` or calling `get` on the Future. This means that any errors during `send` are not captured. Passing the callback seems to be the simplest option in this case and that will tell us if there are errors during `send`.

{code}
package com.reftel.magnus.kafkasequence;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;

public class SequenceTest {
    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();

    private ServerCnxnFactory factory;
    private KafkaServerStartable broker;
    private int kafkaPort;

    @Before
    public void before() throws Throwable {
        factory = NIOServerCnxnFactory.createFactory(null, 10);

        factory.startup(new ZooKeeperServer(
            temp.newFolder("zk-snapshot"), temp.newFolder("zk-log"), ZooKeeperServer.DEFAULT_TICK_TIME
        ));

        try (ServerSocket socket = new ServerSocket(0)) {
            kafkaPort = socket.getLocalPort();
        }

        Properties props = new Properties();
        props.put("listeners", String.format("PLAINTEXT://%s:%d", "localhost", kafkaPort));
        props.put("log.dir", temp.newFolder("kafka").toString());
        props.put("log.segment.bytes", "4000");
        props.put("num.partitions", "1");
        props.put("zookeeper.connect", String.format("localhost:%d", factory.getLocalPort()));

        broker = new KafkaServerStartable(new KafkaConfig(props));
        broker.startup();
    }

    @After
    public void after() {
        broker.shutdown();
        factory.shutdown();
    }

    private KafkaProducer<String, String> buildProducer() throws IOException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("localhost:%d", kafkaPort));
        props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.toString(60000));
        return new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    }

    private KafkaConsumer<String, String> buildConsumer() throws IOException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("localhost:%d", kafkaPort));
        return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @Test
    public void testSequence() throws IOException {
        List<ProducerRecord<String, String>> records = new ArrayList<>(9);
        for (int i = 0; i < 9; i++) {
            int dataLength = 2000;
            StringBuilder sb = new StringBuilder(dataLength);
            for (int n = 0; n < dataLength; n++) {
                sb.append('x');
            }
            records.add(new ProducerRecord<>("t", Integer.toString(i), sb.toString()));
        }

        final KafkaProducer<String, String> producer = buildProducer();
        records.forEach(producer::send);
        producer.flush();
        producer.close();

        final KafkaConsumer<String, String> consumer = buildConsumer();
        consumer.assign(Collections.singleton(new TopicPartition("t", 0)));
        consumer.seekToBeginning(consumer.assignment());
        final ConsumerRecords<String, String> read = consumer.poll(1000);
        if (!read.isEmpty()) {
            final ConsumerRecord<String, String> first = read.iterator().next();
            Assert.assertEquals(first.key(), "0");
        }
    }
}
{code}

> First records in batch rejected but others accepted when rolling log
> --------------------------------------------------------------------
>
>                 Key: KAFKA-5390
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5390
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.1
>            Reporter: Magnus Reftel
>         Attachments: kafka-sequence.tar.gz
>
>
> When sending a sequence of records in a batch right when the broker needs to roll a new segment, it's possible for the first few records to fail, while other records in the batch are accepted. If records have dependencies on earlier records, e.g. in the case of a sequence of events in an event-sourced system, then a producer cannot use the batching functionality, since it then risks consumers receiving a record without first receiving the records it depends on.
> See attached testcase (kafka-sequence.tar.gz).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)