You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ozgur (JIRA)" <ji...@apache.org> on 2018/11/14 16:42:00 UTC
[jira] [Created] (KAFKA-7628) KafkaStream is not closing
Ozgur created KAFKA-7628:
----------------------------
Summary: KafkaStream is not closing
Key: KAFKA-7628
URL: https://issues.apache.org/jira/browse/KAFKA-7628
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.11.0.1
Environment: Macbook Pro
Reporter: Ozgur
I'm closing a KafkaStream when I need based on a certain condition:
Closing:
{code:java}
if(kafkaStream == null) {
logger.info("KafkaStream already closed?");
} else {
boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
if(closed) {
kafkaStream = null;
logger.info("KafkaStream closed");
} else {
logger.info("KafkaStream could not closed");
}
}
{code}
Starting:
{code:java}
if(kafkaStream == null) {
logger.info("KafkaStream is starting");
kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
this,
this.getTopic()
);
kafkaStream.start();
logger.info("KafkaStream is started");
}
{code}
In my implementation of Processor, {{process(String key, byte[] value)}} is still called although successfully closing stream:
{code:java}
// code placeholder
public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
private ProcessorContext context;
private ProcessorContext getContext() {
return context;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
}
@Override
public void process(String key, byte[] value) {
try {
String topic = key.split("-")[0];
byte[] uncompressed = GzipCompressionUtil.uncompress(value);
String json = new String(uncompressed, "UTF-8");
processRecord(topic, json);
this.getContext().commit();
} catch (Exception e) {
logger.error("Error processing json", e);
}
}
protected abstract void processRecord(String topic, String json);
@Override
public void punctuate(long timestamp) {
this.getContext().commit();
}
@Override
public void close() {
this.getContext().commit();
}
}
{code}
My configuration for KafkaStreams:
{code:java}
application.id=dv_ws_in_app_activity_dev4
bootstrap.servers=VLXH1
auto.offset.reset=latest
num.stream.threads=1
key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
poll.ms = 100
commit.interval.ms=1000
state.dir=../../temp/kafka-state-dir
{code}
Version: *0.11.0.1*
I'm witnessing that after closing() the streams, these ports are still listening:
{code:java}
$ sudo lsof -i -n -P | grep 9092
java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)