You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ozgur (JIRA)" <ji...@apache.org> on 2018/11/14 16:42:00 UTC

[jira] [Created] (KAFKA-7628) KafkaStream is not closing

Ozgur created KAFKA-7628:
----------------------------

             Summary: KafkaStream is not closing
                 Key: KAFKA-7628
                 URL: https://issues.apache.org/jira/browse/KAFKA-7628
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.1
         Environment: Macbook Pro
            Reporter: Ozgur


I'm closing a KafkaStream when I need based on a certain condition:

Closing:

 
{code:java}
if(kafkaStream == null) {
            logger.info("KafkaStream already closed?");
        } else {
            boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
            if(closed) {
                kafkaStream = null;
                logger.info("KafkaStream closed");
            } else {
                logger.info("KafkaStream could not closed");
            }
        }
{code}
Starting:

 
{code:java}
if(kafkaStream == null) {
            logger.info("KafkaStream is starting");
            kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
                    this,
                    this.getTopic()
            );
            kafkaStream.start();
            logger.info("KafkaStream is started");
        }
{code}
 

 

In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream:

 
{code:java}
// code placeholder
public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
    private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
    private ProcessorContext context;


    private ProcessorContext getContext() {
        return context;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
    }


    @Override
    public void process(String key, byte[] value) {
        try {
            String topic = key.split("-")[0];
            byte[] uncompressed = GzipCompressionUtil.uncompress(value);
            String json = new String(uncompressed, "UTF-8");
            processRecord(topic, json);
            this.getContext().commit();
        } catch (Exception e) {
            logger.error("Error processing json", e);
        }
    }

    protected abstract void processRecord(String topic, String json);

    @Override
    public void punctuate(long timestamp) {
        this.getContext().commit();
    }

    @Override
    public void close() {
        this.getContext().commit();
    }
}
{code}
 

My configuration for KafkaStreams:

 
{code:java}
application.id=dv_ws_in_app_activity_dev4
bootstrap.servers=VLXH1
auto.offset.reset=latest
num.stream.threads=1
key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
poll.ms = 100
commit.interval.ms=1000
state.dir=../../temp/kafka-state-dir
{code}
Version: *0.11.0.1* 

 

I'm witnessing that after closing() the streams, these ports are still listening:

 
{code:java}
$ sudo lsof -i -n -P | grep 9092
java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)

java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)

java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)

java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)