You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2018/11/15 01:27:00 UTC
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687359#comment-16687359 ]
John Roesler commented on KAFKA-7628:
-------------------------------------
Hi [~lugrugzo],
To confirm, you see that Streams successfully closes, but afterwards, it's still bound to the TCP ports?
Have you noticed whether it stays bound indefinitely, or does it stop listening at some point after closing?
Thanks,
-John
> KafkaStream is not closing
> --------------------------
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.11.0.1
> Environment: Macbook Pro
> Reporter: Ozgur
> Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>
>
> In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream:
>
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
> private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>
> My configuration for KafkaStreams:
>
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1*
>
> I'm witnessing that after closing() the streams, these ports are still listening:
>
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)