You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 孟庆建 <10...@qq.com> on 2018/06/12 09:13:30 UTC

Consumers cannot consume topic through the public network

In the internal network environment, bootstrap.server fills in floating IP, which can be used for normal production and consumption.
After the local configuration of bootstrap.server as the public network IP and port on the server, my producer can produce very very very slowly, and consumers cannot consume it.



OS: SUSE 12 SP3
kafka node1 : 172.20.1.10  port:9092 hostname: kafka01

kafka node2 : 172.20.1.11  port:9092 hostname: kafka02

kafka node3 : 172.20.1.12  port:9092 hostname: kafka03

kafka VIP : 172.20.1.110



zk node1 : 172.20.1.13

zk node2 : 172.20.1.14
zk node3 : 172.20.1.15


kafka version: kafka_2.11-1.1.0



kafka server.properties:

​
broker.id=0
advertised.listeners=PLAINTEXT://172.20.1.10:9092
(I tried to change advertised.listeners to public ip and port.But it's still not feasible )
(I tried to change  advertised.listeners to kafka01 and port,and change my code to props.put("bootstrap.servers", "kafka01:20001"); then add "172.20.1.11  kafka01" to /etc/hosts, add "$PUBLICIP kafka01" to C:\Windows\System32\drivers\etc\hosts. But it's still not feasible)
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafkalogs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.13:2181,172.20.1.13:2181,172.20.1.13:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true






this is my code, Port 20001 of the public network IP is mapped to port 9092 of the internal network IP. (By VMware vCloud Director. we can't telnet public ip and port on kafka01/02/03)


Producer:


​
​
public class MyProducer extends Thread {
    private String topic;
    private static Producer producer;
    private int i;

    public MyProducer(String topic) {
        this.topic = topic;
    }

    @Override
    public void run() {
        while (true) {
            producer.send(new ProducerRecord(topic, i++ + ""));
            System.out.println("create :" + i);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static Producer createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "xx.xx.xx.xx:20001"); ​​// public ip ()
//        props.put("bootstrap.servers", "172.20.1.110:9092"); //private ip (floating ip/VIP)
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer(props);
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.println("need topicName");
        } else {
            producer = createProducer();
            new MyProducer(args[0]).start();
        }
    }
}





Consumer:









​public class MyCustomer {

    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.println("java -jar $JAR_NAME.jar $group.id $topicName");
        } else {
            System.out.println(args[0]+"  "+args[1]);
            Properties props = new Properties();
//            props.put("bootstrap.servers", "172.21.1.110:9092");
            props.put("bootstrap.servers", "xx.xx.xx.xx:20001");
            props.put("group.id", args[0]+"");
            props.put("enable.auto.commit", "true");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(args[1]));
            while (true) {
//                System.out.println("poll数据");
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}