You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2018/11/15 01:27:00 UTC

[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

    [ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687359#comment-16687359 ] 

John Roesler commented on KAFKA-7628:
-------------------------------------

Hi [~lugrugzo],

To confirm, you see that Streams successfully closes, but afterwards, it's still bound to the TCP ports?

Have you noticed whether it stays bound indefinitely, or does it stop listening at some point after closing?

Thanks,

-John

> KafkaStream is not closing
> --------------------------
>
>                 Key: KAFKA-7628
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7628
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1
>         Environment: Macbook Pro
>            Reporter: Ozgur
>            Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
>             logger.info("KafkaStream already closed?");
>         } else {
>             boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
>             if(closed) {
>                 kafkaStream = null;
>                 logger.info("KafkaStream closed");
>             } else {
>                 logger.info("KafkaStream could not closed");
>             }
>         }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
>             logger.info("KafkaStream is starting");
>             kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
>                     this,
>                     this.getTopic()
>             );
>             kafkaStream.start();
>             logger.info("KafkaStream is started");
>         }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
>     private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
>     private ProcessorContext context;
>     private ProcessorContext getContext() {
>         return context;
>     }
>     @Override
>     public void init(ProcessorContext context) {
>         this.context = context;
>         this.context.schedule(1000);
>     }
>     @Override
>     public void process(String key, byte[] value) {
>         try {
>             String topic = key.split("-")[0];
>             byte[] uncompressed = GzipCompressionUtil.uncompress(value);
>             String json = new String(uncompressed, "UTF-8");
>             processRecord(topic, json);
>             this.getContext().commit();
>         } catch (Exception e) {
>             logger.error("Error processing json", e);
>         }
>     }
>     protected abstract void processRecord(String topic, String json);
>     @Override
>     public void punctuate(long timestamp) {
>         this.getContext().commit();
>     }
>     @Override
>     public void close() {
>         this.getContext().commit();
>     }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)