You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ko byoung kwon (JIRA)" <ji...@apache.org> on 2018/05/04 04:56:00 UTC
[jira] [Updated] (KAFKA-6860) NPE when reinitializeStateStores with
eos enabled
[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ko byoung kwon updated KAFKA-6860:
----------------------------------
Summary: NPE when reinitializeStateStores with eos enabled (was: NPE when reinitializeStateStores with eos enabled(true))
> NPE when reinitializeStateStores with eos enabled
> -------------------------------------------------
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Environment: mac, kafka1.1
> Reporter: ko byoung kwon
> Priority: Major
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> First of all, please understand that my English is not fluent.
> *Symptom*
> With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] Encountered the following error during processing:
> java.lang.NullPointerException: null
> at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) ~[kafka-streams-1.1.0.jar:na]
> at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) ~[kafka-streams-1.1.0.jar:na]
> at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) ~[kafka-streams-1.1.0.jar:na]
> at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
> - changelog topic with short `retention.ms` and `delete` policy (just to reproduce the symptom easily)
> ex) retention.ms=30000,cleanup.policy=delete
> - exaclty once semantic enabled
> - no cleanup
> *Step*
> - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], was#2:task[0_1])
> - write some data each state store(changelog topic will soon erase those messages. by short "retentin.ms")
> - when was#2 killed, then was#1 will restore task[0_1]'s data on its own rocksDB
> - In the process, it finds a checkpoint and an error occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map<String, String> topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
> Consumed.with(Serdes.Long(), Serdes.String()))
> .groupByKey()
> .count(Materialized
> .<Long, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_STORE_NAME)
> .withKeySerde(Serdes.Long())
> .withValueSerde(Serdes.Long())
> .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
> I think , need to add some code to create a Checkpoint.
> As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
> throw new StreamsException("Failed to reinitialize global store.", fatalException);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)