You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2019/03/01 21:52:00 UTC
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782124#comment-16782124 ]
Matthias J. Sax commented on KAFKA-7628:
----------------------------------------
[~guozhang] What is the fixed version for this? Is this ticket a duplicate?
> KafkaStream is not closing
> --------------------------
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.11.0.1
> Environment: Macbook Pro
> Reporter: Ozgur
> Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>
>
> In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream:
>
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
> private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>
> My configuration for KafkaStreams:
>
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1*
>
> I'm witnessing that after closing() the streams, these ports are still listening:
>
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)