You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "sunqing (JIRA)" <ji...@apache.org> on 2019/06/06 03:46:00 UTC

[jira] [Created] (KAFKA-8497) kafka streams application占用内存很高

sunqing created KAFKA-8497:
------------------------------

             Summary: kafka streams application占用内存很高
                 Key: KAFKA-8497
                 URL: https://issues.apache.org/jira/browse/KAFKA-8497
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: sunqing


一个简单的kafka streams测试应用,使用KStream来消费数据,当所消费的kafka Topic中的数据暴涨时,或者要消费的Topic中待消费数据量很大时,消费程序占用的内存会非常高,能达到20多G,

疑问:kafka streams不是逐条消费吗,为啥topic中的数据量很大时会导致程序内存飙升

 

测试程序代码如下:

 

代码如下:

public class RunMain {

public static StreamsBuilder builder = new StreamsBuilder();


 public static void kafkaStreamStart() {
 KStream<String, String> stream = builder.stream(Arrays.asList("wk_wangxin_po"));
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin");
 
 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
 "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
 props.setProperty("security.protocol", "SASL_PLAINTEXT");
 props.setProperty("sasl.mechanism", "PLAIN");
 props.setProperty("sasl.kerberos.service.name", "kafka");
 System.setProperty("java.security.auth.login.config", "./conf/kafka_client_jaas.conf");
 
 stream.foreach(new ForeachAction<String, String>() {
 @Override
 public void apply(String key, String value) {
 System.out.println("====");
 System.out.println(key);
 }
 });
 Topology topo = builder.build();
 KafkaStreams streams = new KafkaStreams(topo, props);
 streams.start();

}

public static void main(String[] args) {
 kafkaStreamStart();
 }

}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)