You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Abhishek Verma <ab...@gmail.com> on 2017/11/02 04:25:42 UTC

Reg. Kafka transactional producer and consumer

Hi All,

I am trying to make a hello world example for Transactional Producer and trying to consume.
I am doing all this in plain java.

I can produce but consumer is not consuming message.

I searched over other places and I found some people have same problem.

Right now, I am using single broker. I tried same with 3 brokers also and it was not working at that time also.

I don’t know what I am missing and where… :p in Consumer I am missing something or in producer.

I have attached Producer and Consumer codes and console logs with my broker logs

Thanks,
Abhishek


My Broker logs after producing messages
<LOGS broker>
[2017-11-01 18:45:55,000] INFO Updated PartitionLeaderEpoch. New: {epoch:4, offset:3}, Current: {epoch:3, offset0} for Partition: __transaction_state-2. Cache now contains 1 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-11-01 18:46:03,482] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.5031925219291776-156417066 with producerId 4001 and producer epoch 0 on partition __transaction_state-2 (kafka.coordinator.transaction.TransactionCoordinator)
</LOGS broker>

My producer code is
<CODE producer>
import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonSerializer;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.*;

public class SampleProducer {

    public static String topic = "topic-4";

    public static void main(String[] args) {

        Properties configProperties = new Properties();

        //configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "some-client-id");
        configProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TXN_ID:" + new Random().nextDouble() + new Random().nextInt());
        configProperties.put("acks", "all");
        configProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProperties.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        configProperties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        configProperties.put("value.serializer", JsonSerializer.class);
        configProperties.put("bootstrap.servers", "192.168.41.132:9090<http://192.168.41.132:9090>");


        KafkaProducer<Integer, DataObject>producer = new KafkaProducer<>(configProperties);

        System.out.println("Init Transaction");
        producer.initTransactions();
        try {

            System.out.println("transaction initialised going to begin transaction");
            producer.beginTransaction();
            System.out.println("Transaction started");

            ProducerRecord rec = new ProducerRecord(topic, 5, new DataObject(5, "Hello, World!"));

            RecordMetadata metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + metadata.offset());

            metadata = (RecordMetadata) producer.send(rec).get();
            System.out.println("The offset of the record we just sent is: " + metadata.offset());

            producer.commitTransaction();
            System.out.println("Transaction Committed");

        }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            System.out.println("Connection closed but commit failed. We can't recover");
            producer.close();
        }catch(KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            System.out.println("Abort Transaction");
            producer.abortTransaction();
        }catch (Exception x){}
        producer.close();
        System.out.println("Closed");
    }
}


</CODE proucer>

These are my producer console logs

<LOGS producer>
0    [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
buffer.memory = 33554432
client.id<http://client.id> =
compression.type = none
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms<http://linger.ms> = 0
max.block.ms<http://max.block.ms> = 60000
max.in<http://max.in>.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 30000
retries = 2147483647
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms<http://transaction.timeout.ms> = 60000
transactional.id<http://transactional.id> = TXN_ID:0.5031925219291776-156417066
value.serializer = class com.example.transaction.producer.utils.serde.JsonSerializer

261  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Instantiated a transactional producer.
265  [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Overriding the default max.in<http://max.in>.flight.requests.per.connection to 1 since idempontence is enabled.
274  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bufferpool-wait-time
281  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name buffer-exhausted-records
284  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = [])
297  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name produce-throttle-time
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:
564  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:
565  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:
566  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:
567  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:
573  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name batch-size
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name compression-rate
574  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name queue-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name request-time
575  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-per-request
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-retries
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name errors
577  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name record-size-max
579  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name batch-split-rate
582  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Starting Kafka producer I/O thread.
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.0
585  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : cb8625948210849f
586  [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - Kafka producer started
Init Transaction
588  [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state UNINITIALIZED to INITIALIZING
588  [main] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to -1 with epoch -1
594  [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
598  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION)
598  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000)
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>.
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1.  Fetching API versions.
712  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node -1.
897  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
898  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=TXN_ID:0.5031925219291776-156417066, coordinatorType=TRANSACTION) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
901  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>.
901  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-sent
902  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-received
902  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.latency
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 1001.  Fetching API versions.
903  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 1001.
905  [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
1009 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, transactionTimeoutMs=60000) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9491 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] ProducerId set to 4001 with epoch 0
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state INITIALIZING to READY
transaction initialised going to begin transaction
9491 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state READY to IN_TRANSACTION
Transaction started
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1
9491 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
9523 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Begin adding new partition topic-4-0 to transaction
9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0])
9523 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, partitions=[topic-4-0]) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Successfully added partitions [topic-4-0] to transaction
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.records-per-batch
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.bytes
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.compression-rate
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.record-retries
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.record-errors
9538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-4-0 to 1
The offset of the record we just sent is: 5
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 1 from producer (producerId=4001, epoch=0) to dequeued batch from partition topic-4-0 bound for 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null).
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-4-0 to 2
The offset of the record we just sent is: 6
9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
9554 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT)
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.5031925219291776-156417066, producerId=4001, producerEpoch=0, result=COMMIT) to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.5031925219291776-156417066] Transition from state COMMITTING_TRANSACTION to READY
Transaction Committed
9554 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-closed:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name connections-created:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-sent:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name bytes-received:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name select-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name io-time:
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node--1.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.bytes-sent
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.bytes-received
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Removed sensor with name node-1001.latency
9554 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Shutdown of Kafka producer I/O thread has completed.
9554 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer  - The Kafka producer has closed.
Closed

</LOGS producer console>



My consumer side code.

<CODE consumer>
package com.example.transaction.producer.consumer;

import com.example.transaction.producer.utils.DataObject;
import com.example.transaction.producer.utils.serde.JsonDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties<http://java.util.Properties>;


public class Consumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    public static String topic = "topic-4";

    public static void main(String[] args) {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.41.132:9090<http://192.168.41.132:9090>");
        configProperties.put("group.id<http://group.id>","some-different-group-3");
        configProperties.put("enable.auto<http://enable.auto>.commit", "true");
        configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");


        KafkaConsumer<Integer, DataObject> consumer = new KafkaConsumer(configProperties,new IntegerDeserializer(),new JsonDeserializer(DataObject.class));

        consumer.subscribe(Arrays.asList(topic));

        LOGGER.info<http://LOGGER.info>("*************** Starting Consumer *************");

        while (true) {
            ConsumerRecords<Integer, DataObject> records = consumer.poll(1000);
            records.forEach(record -> {
                System.out.printf("offset = %d\n", record.offset());
                System.out.println("Key = " + record.key().toString() + "\nMessage = " + record.value().toString());
            });
        }

    }
}

</CODE consumer


My Consumer Console logs.

<LOG consumer>
0    [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = earliest
bootstrap.servers = [192.168.41.132:9090<http://192.168.41.132:9090>]
check.crcs = true
client.id<http://client.id> =
connections.max.idle.ms<http://connections.max.idle.ms> = 540000
enable.auto<http://enable.auto>.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = some-different-group-3
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = null
internal.leave.group<http://internal.leave.group>.on.close = true
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 300000
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 305000
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms<http://session.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.example.transaction.producer.utils.serde.JsonDeserializer

2    [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Starting the Kafka consumer
139  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)], partitions = [])
164  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-throttle-time
444  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-closed:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name connections-created:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent-received:
446  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-sent:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-received:
447  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name select-time:
448  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name io-time:
478  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name heartbeat-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name join-latency
479  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name sync-latency
482  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name commit-latency
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name bytes-fetched
488  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-fetched
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name fetch-latency
490  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name records-lag
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.0
495  [main] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : cb8625948210849f
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Kafka consumer created
497  [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer  - Subscribed to topic(s): topic-4
498  [main] INFO  com.example.transaction.producer.consumer.Consumer  - *************** Starting Consumer *************
498  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group some-different-group-3 to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: -1 rack: null)
509  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node -1 at 192.168.41.132:9090<http://192.168.41.132:9090>.
523  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-sent
525  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.bytes-received
526  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node--1.latency
527  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node -1.  Fetching API versions.
531  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node -1.
661  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
662  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic-4) to node -1
672  [main] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)], partitions = [Partition(topic = topic-4, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
675  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1509541833582, latencyMs=170, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null))) for group some-different-group-3
676  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null) for group some-different-group-3.
676  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 2147482646 at 192.168.41.132:9090<http://192.168.41.132:9090>.
679  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending synchronous auto-commit of offsets {} for group some-different-group-3
679  [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking previously assigned partitions [] for group some-different-group-3
679  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining group some-different-group-3
682  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending JoinGroup ((type: JoinGroupRequest, groupId=some-different-group-3, sessionTimeout=10000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@133e16fd)) to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
683  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.bytes-sent
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.bytes-received
684  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-2147482646.latency
684  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147482646
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 2147482646.  Fetching API versions.
685  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 2147482646.
692  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 2147482646: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
695  [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Heartbeat thread for group some-different-group-3 started
717  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful JoinGroup response for group some-different-group-3: org.apache.kafka.common.requests.JoinGroupResponse@140e5a13
718  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing assignment for group some-different-group-3 using strategy range with subscriptions {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Subscription(topics=[topic-4])}
719  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished assignment for group some-different-group-3: {consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806=Assignment(partitions=[topic-4-0])}
721  [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending leader SyncGroup for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null): (type=SyncGroupRequest, groupId=some-different-group-3, generationId=6, memberId=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806, groupAssignment=consumer-1-b3c2f6d7-ac1d-4cd3-8c9a-1f00fe9ff806)
787  [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully joined group some-different-group-3 with generation 6
787  [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting newly assigned partitions [topic-4-0] for group some-different-group-3
803  [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group some-different-group-3 fetching committed offsets for partitions: [topic-4-0]
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Resetting offset for partition topic-4-0 to the committed offset 0
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating connection to node 1001 at 192.168.41.132:9090<http://192.168.41.132:9090>.
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-sent
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.bytes-received
819  [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name node-1001.latency
819  [main] DEBUG org.apache.kafka.common.network<http://org.apache.kafka.common.network>.Selector  - Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1001
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Completed connection to node 1001.  Fetching API versions.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Initiating API versions fetch from node 1001.
819  [main] DEBUG org.apache.kafka.clients.NetworkClient  - Recorded API versions for node 1001: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.bytes-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic-4.records-fetched
1358 [main] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic-4-0.records-lag
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1358 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2388 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2906 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
2907 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3429 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3792 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
3792 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful Heartbeat response for group some-different-group-3
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
3946 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4462 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
4979 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5496 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
5797 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending asynchronous auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3
5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group some-different-group-3 committed offset 0 for partition topic-4-0
5819 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Completed auto-commit of offsets {topic-4-0=OffsetAndMetadata{offset=0, metadata=''}} for group some-different-group-3
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6015 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6536 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
6800 [kafka-coordinator-heartbeat-thread | some-different-group-3] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending Heartbeat request for group some-different-group-3 to coordinator 192.168.41.132:9090<http://192.168.41.132:9090> (id: 2147482646 rack: null)
6800 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful Heartbeat response for group some-different-group-3
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7038 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
7554 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8070 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
8587 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-4-0 at offset 0 to node 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
9104 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-4-0] to broker 192.168.41.132:9090<http://192.168.41.132:9090> (id: 1001 rack: null)
</LOGS consumer>




Re: Reg. Kafka transactional producer and consumer

Posted by Apurva Mehta <ap...@confluent.io>.
Hi,

Your log segment dump and the producer log don't correlate. The producer
log shows the producerId == 4001. But your log segment dumps don't have
this producerId. Please share data from the same run where you reproduce
this issue.

For the producerId's 0-4 (shown in the dump), there seem to be no
transaction markers (because these would have sequence number == -1). So if
your messages from producerId 4001 are behind these messages, they would
never be read in read committed mode.

Thanks,
Apurva

On Mon, Nov 6, 2017 at 9:44 PM, Abhishek Verma <ab...@gmail.com>
wrote:

> Hi Matthis J. Sax,
>
> Thank you for your suggestions.
>
> I tried the same in kafka 1.0.0 version also. Same issue is coming.
>
> I am attaching log segment below please let me know what might be the
> problem.
>
> Regards,
> Abhishek Verma
>
> <dump Log segment>
>
>
>
> Dumping 00000000000000000000.index
>
> offset: 0 position: 0
>
> Dumping 00000000000000000000.log
>
> Starting offset: 0
>
> baseOffset: 0 lastOffset: 0 baseSequence: 0 lastSequence: 0 producerId: 0
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 0
> CreateTime: 1509605714710 isvalid: true size: 103 magic: 2 compresscodec:
> NONE crc:344974185
>
> baseOffset: 1 lastOffset: 1 baseSequence: 1 lastSequence: 1 producerId: 0
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 103 CreateTime: 1509605714863 isvalid: true size: 103 magic: 2
> compresscodec: NONE crc:102431214
>
> baseOffset: 2 lastOffset: 2 baseSequence: 0 lastSequence: 0 producerId: 1
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 206 CreateTime: 1509607351944 isvalid: true size: 103 magic: 2
> compresscodec: NONE crc:1129944557
>
> baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: 2
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 309 CreateTime: 1509616649669 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:630443129
>
> baseOffset: 4 lastOffset: 4 baseSequence: 0 lastSequence: 0 producerId: 3
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 419 CreateTime: 1509616850564 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:3357473778
>
> baseOffset: 5 lastOffset: 5 baseSequence: 0 lastSequence: 0 producerId: 4
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 529 CreateTime: 1509624206511 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:1193735168
>
> baseOffset: 6 lastOffset: 6 baseSequence: 0 lastSequence: 0 producerId: 5
> producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position:
> 639 CreateTime: 1509624453377 isvalid: true size: 110 magic: 2
> compresscodec: NONE crc:3859361029
>
> Dumping 00000000000000000000.timeindex
>
> timestamp: 0 offset: 0
>
> Found timestamp mismatch in :D:\tmp\kafka-logs-0\topic-0\
> 00000000000000000000.timeindex
>
> Index timestamp: 0, log timestamp: 1509605714710
>
> Index timestamp: 0, log timestamp: 1509605714710
>
> Found out of order timestamp in :D:\tmp\kafka-logs-0\topic-0\
> 00000000000000000000.timeindex
>
> Index timestamp: 0, Previously indexed timestamp: 0
>
>
>
> </dumpet log segment>
> ________________________________
> From: Matthias J. Sax <ma...@confluent.io>
> Sent: Saturday, November 4, 2017 8:11:07 PM
> To: users@kafka.apache.org
> Subject: Re: Reg. Kafka transactional producer and consumer
>
> Hi,
>
> this consumer log line indicates that there is an open/pending
> transaction (ie, neither committed nor aborted) and thus, the broker
> does not deliver the data to the consumer.
>
> -> highWaterMark = 5, but lastStableOffset = 0
>
>
> On 11/2/17 5:25 AM, Abhishek Verma wrote:
> > 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
> - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch
> data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset =
> 0, abortedTransactions = [], recordsSizeInBytes=0)
>
>
> Thus, there must be an issue on the producer side, that the transactions
> does not get committed. Not sure why though, as producer logs indicate
> that the TX was committed successfully.
>
> Maybe you can dump the log segments to see what is in them?
>
> Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
> transactions. Maybe you can try if it fixed in 1.0.0.
>
>
> -Matthias
>
>

Re: Reg. Kafka transactional producer and consumer

Posted by Abhishek Verma <ab...@gmail.com>.
Hi Matthis J. Sax,

Thank you for your suggestions.

I tried the same in kafka 1.0.0 version also. Same issue is coming.

I am attaching log segment below please let me know what might be the problem.

Regards,
Abhishek Verma

<dump Log segment>



Dumping 00000000000000000000.index

offset: 0 position: 0

Dumping 00000000000000000000.log

Starting offset: 0

baseOffset: 0 lastOffset: 0 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 0 CreateTime: 1509605714710 isvalid: true size: 103 magic: 2 compresscodec: NONE crc:344974185

baseOffset: 1 lastOffset: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 103 CreateTime: 1509605714863 isvalid: true size: 103 magic: 2 compresscodec: NONE crc:102431214

baseOffset: 2 lastOffset: 2 baseSequence: 0 lastSequence: 0 producerId: 1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 206 CreateTime: 1509607351944 isvalid: true size: 103 magic: 2 compresscodec: NONE crc:1129944557

baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: 2 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 309 CreateTime: 1509616649669 isvalid: true size: 110 magic: 2 compresscodec: NONE crc:630443129

baseOffset: 4 lastOffset: 4 baseSequence: 0 lastSequence: 0 producerId: 3 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 419 CreateTime: 1509616850564 isvalid: true size: 110 magic: 2 compresscodec: NONE crc:3357473778

baseOffset: 5 lastOffset: 5 baseSequence: 0 lastSequence: 0 producerId: 4 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 529 CreateTime: 1509624206511 isvalid: true size: 110 magic: 2 compresscodec: NONE crc:1193735168

baseOffset: 6 lastOffset: 6 baseSequence: 0 lastSequence: 0 producerId: 5 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true position: 639 CreateTime: 1509624453377 isvalid: true size: 110 magic: 2 compresscodec: NONE crc:3859361029

Dumping 00000000000000000000.timeindex

timestamp: 0 offset: 0

Found timestamp mismatch in :D:\tmp\kafka-logs-0\topic-0\00000000000000000000.timeindex

Index timestamp: 0, log timestamp: 1509605714710

Index timestamp: 0, log timestamp: 1509605714710

Found out of order timestamp in :D:\tmp\kafka-logs-0\topic-0\00000000000000000000.timeindex

Index timestamp: 0, Previously indexed timestamp: 0



</dumpet log segment>
________________________________
From: Matthias J. Sax <ma...@confluent.io>
Sent: Saturday, November 4, 2017 8:11:07 PM
To: users@kafka.apache.org
Subject: Re: Reg. Kafka transactional producer and consumer

Hi,

this consumer log line indicates that there is an open/pending
transaction (ie, neither committed nor aborted) and thus, the broker
does not deliver the data to the consumer.

-> highWaterMark = 5, but lastStableOffset = 0


On 11/2/17 5:25 AM, Abhishek Verma wrote:
> 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)


Thus, there must be an issue on the producer side, that the transactions
does not get committed. Not sure why though, as producer logs indicate
that the TX was committed successfully.

Maybe you can dump the log segments to see what is in them?

Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
transactions. Maybe you can try if it fixed in 1.0.0.


-Matthias


Re: Reg. Kafka transactional producer and consumer

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

this consumer log line indicates that there is an open/pending
transaction (ie, neither committed nor aborted) and thus, the broker
does not deliver the data to the consumer.

-> highWaterMark = 5, but lastStableOffset = 0


On 11/2/17 5:25 AM, Abhishek Verma wrote:
> 1871 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 0 for partition topic-4-0 returned fetch data (error=NONE, highWaterMark=5, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)


Thus, there must be an issue on the producer side, that the transactions
does not get committed. Not sure why though, as producer logs indicate
that the TX was committed successfully.

Maybe you can dump the log segments to see what is in them?

Btw: Kafka 1.0.0 was release recently, containing several bug fixes for
transactions. Maybe you can try if it fixed in 1.0.0.


-Matthias