You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2021/09/29 10:12:29 UTC

[incubator-wayang] 03/03: [WAYANG-28] Shipper and Receiver with Rabbit and Kafka

This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit df77465177aedf9917a9291e21ded656288d5c54
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Wed Sep 29 12:11:56 2021 +0200

    [WAYANG-28] Shipper and Receiver with Rabbit and Kafka
---
 wayang-plugins/pom.xml                             |   1 +
 .../hackit-shipper-rabbitmq/pom.xml                |  31 +++++
 .../rabbitmq/HachitShipperDirectRabbitMQ.java      |  86 ++++++++++++++
 .../shipper/rabbitmq/HachitShipperRabbitMQ.java    |   4 +
 .../receiver/ReceiverMultiChannelRabbitMQ.java     | 116 ++++++++++++++++++
 .../sender/SenderMultiChannelRabbitMQ.java         |  76 ++++++++++++
 .../src/main/resources/rabbitmq-config.properties  |   5 +
 .../wayang-hackit/wayang-hackit-shipper/pom.xml    |   1 -
 .../wayang-hackit-shipper-kafka/pom.xml            |  16 +++
 .../hackit/shipper/kafka/sender/KafkaHackit.java   | 132 +++++++++++++++++++++
 .../src/main/resources/kafka-config.properties     |   0
 11 files changed, 467 insertions(+), 1 deletion(-)

diff --git a/wayang-plugins/pom.xml b/wayang-plugins/pom.xml
index 90bd0d7..1341a7e 100644
--- a/wayang-plugins/pom.xml
+++ b/wayang-plugins/pom.xml
@@ -41,6 +41,7 @@
     <modules>
         <module>wayang-iejoin</module>
         <module>wayang-hackit</module>
+        <module>wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq</module>
     </modules>
 
 
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml
new file mode 100644
index 0000000..b3220bd
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>wayang-plugins</artifactId>
+        <groupId>org.apache.wayang</groupId>
+        <version>0.6.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hackit-shipper-rabbitmq</artifactId>
+
+    <properties>
+        <rabbitmq.version>5.7.3</rabbitmq.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-hackit-core</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>${rabbitmq.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java
new file mode 100644
index 0000000..046fcd7
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java
@@ -0,0 +1,86 @@
+package org.apache.wayang.hackit.shipper.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.wayang.hackit.shipper.rabbitmq.receiver.ReceiverMultiChannelRabbitMQ;
+import org.apache.wayang.hackit.shipper.rabbitmq.sender.SenderMultiChannelRabbitMQ;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.Shipper;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/** Direct is faster but there is not multiple categories of topics, just a word
+ *  In other words you publish based on a word, all the consumers who listen that word will fill the message in their queues
+ * */
+public class HachitShipperDirectRabbitMQ<K, T, ST, HackSender extends Sender<ST>, HackReceiver
+        extends Receiver<HackitTuple<K, T>>>
+        extends Shipper<HackitTuple<K, T>, ST, HackSender, HackReceiver> {
+
+    private transient ConnectionFactory connectionFactory = null;
+
+    /** Consumer Info */
+    private Connection consumeConnection;
+    private Channel consumeChannel;
+    private String consumeExchangeName;
+    private String queueName;
+
+
+    @Override
+    protected HackSender createSenderInstance() {
+        return (HackSender) new SenderMultiChannelRabbitMQ(this.connect());
+    }
+
+    @Override
+    protected HackReceiver createReceiverInstance() {
+        return (HackReceiver) new ReceiverMultiChannelRabbitMQ(this.connect());
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public HackitTuple<K, T> next() {
+        return null;
+    }
+
+    public Connection connect(){
+        if(this.connectionFactory == null){
+            Properties prop = new Properties();
+            InputStream is = null;
+
+            try {
+                //is = new FileInputStream("../../resources/rabbitmq-config.properties");
+                is = new FileInputStream("C:\\Users\\Admin\\Desktop\\hackit\\hackit-shipper\\hackit-shipper-rabbitmq\\src\\main\\resources\\rabbitmq-config.properties");
+                prop.load(is);
+            } catch(IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            this.connectionFactory= new ConnectionFactory();
+            this.connectionFactory.setUsername(prop.getProperty("username"));
+            this.connectionFactory.setPassword(prop.getProperty("password"));
+            this.connectionFactory.setVirtualHost(prop.getProperty("virtualhost"));
+            this.connectionFactory.setHost(prop.getProperty("host"));
+            this.connectionFactory.setPort(Integer.parseInt(prop.getProperty("port")));
+        }
+
+        try {
+            return this.connectionFactory.newConnection();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (TimeoutException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java
new file mode 100644
index 0000000..93ba2b1
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java
@@ -0,0 +1,4 @@
+package org.apache.wayang.hackit.shipper.rabbitmq;
+
+public class HachitShipperRabbitMQ {
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java
new file mode 100644
index 0000000..bff40cb
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java
@@ -0,0 +1,116 @@
+package org.apache.wayang.hackit.shipper.rabbitmq.receiver;
+
+import com.rabbitmq.client.*;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class ReceiverMultiChannelRabbitMQ<K, T> extends Receiver<HackitTuple<K, T>> implements PSProtocol {
+
+    private transient Connection connection;
+    private transient Channel channel;
+    private transient Thread thread_collecting;
+    private transient ArrayList<HackitTuple<K, T>> collection;
+
+    /** Default values */
+    private String exchange_name = "default_consumer";
+    private String topic_name = "default_consumer";
+    private String queue_name= "";
+
+
+    public ReceiverMultiChannelRabbitMQ(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void init() {
+        try {
+            this.channel =  this.connection.createChannel();
+            this.queue_name = this.channel.queueDeclare().getQueue();
+            System.out.println(this.queue_name);
+            this.collection = new ArrayList<>();
+
+            final ReceiverMultiChannelRabbitMQ<K, T> thos = this;
+            this.thread_collecting = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        channel.basicConsume(queue_name, true, new DefaultConsumer(channel) {
+                            @Override
+                            public void handleDelivery(String consumerTag,
+                                                       Envelope envelope,
+                                                       AMQP.BasicProperties properties,
+                                                       byte[] body)
+                                    throws IOException
+                            {
+                                HackitTuple<K, T> elem = SerializationUtils.deserialize(body);
+                                thos.addElement(elem);
+                            }
+                        });
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            this.thread_collecting.run();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public Iterator<HackitTuple<K, T>> getElements() {
+        return this._getElements();
+    }
+
+    @Override
+    public void close() {
+        try {
+            this.thread_collecting.stop();
+            this.channel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (TimeoutException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public PSProtocol addTopic(String... topic) {
+        if(topic.length > 1) {
+            this.topic_name = Arrays.stream(topic).collect(Collectors.joining("."));
+        }else {
+            this.topic_name = topic[0];
+        }
+        return this;
+    }
+
+    @Override
+    public PSProtocol addExchange(String exchange) {
+        this.exchange_name = exchange;
+        return this;
+    }
+
+    public synchronized void addElement(HackitTuple<K, T> element){
+        this.collection.add(element);
+    }
+
+    private synchronized Iterator<HackitTuple<K, T>> _getElements(){
+        if(this.collection.size() == 0){
+            return Collections.emptyIterator();
+        }
+        Iterator<HackitTuple<K, T>> tmp = this.collection.iterator();
+        this.collection = new ArrayList<>();
+        return tmp;
+    }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java
new file mode 100644
index 0000000..3a098dd
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java
@@ -0,0 +1,76 @@
+package org.apache.wayang.hackit.shipper.rabbitmq.sender;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class SenderMultiChannelRabbitMQ<T> implements Sender<T>, PSProtocol {
+    private Connection connection;
+    private Channel channel;
+
+    /** Default values */
+    private String exchange_name = "default";
+    private String topic_name = "default";
+
+    public SenderMultiChannelRabbitMQ(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void init() {
+        try {
+            this.channel = connection.createChannel();
+            channel.exchangeDeclare(exchange_name, "direct");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void send(T value) {
+        try {
+            this.channel.basicPublish(
+                    this.exchange_name,
+                    this.topic_name,
+                    null,
+                    SerializationUtils.serialize((Serializable) value)
+            );
+        } catch (IOException e) {
+            e.printStackTrace();
+        }    }
+
+    @Override
+    public void close() {
+        try {
+            this.channel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (TimeoutException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public PSProtocol addTopic(String... topic) {
+        if(topic.length > 1) {
+            this.topic_name = Arrays.stream(topic).collect(Collectors.joining("."));
+        }else {
+            this.topic_name = topic[0];
+        }
+        return this;
+    }
+
+    @Override
+    public PSProtocol addExchange(String exchange) {
+        this.exchange_name = exchange;
+        return this;
+    }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties
new file mode 100644
index 0000000..c326cdc
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties
@@ -0,0 +1,5 @@
+username=admin
+password=admin
+virtualhost=/
+host=10.4.4.43
+port=50003
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
index d9ac095..f3a7233 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
@@ -33,5 +33,4 @@
         <module>wayang-hackit-shipper-kafka</module>
     </modules>
 
-
 </project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
index 16c4d24..0ad5234 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
@@ -28,5 +28,21 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>wayang-hackit-shipper-kafka</artifactId>
+    <properties>
+        <kafka.version>2.3.0</kafka.version>
+    </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-hackit-core</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java
new file mode 100644
index 0000000..6a3e1c9
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java
@@ -0,0 +1,132 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class KafkaHackit<T> implements Sender<T> {
+
+    static Map<String, String> KAFKA_MAPPING;
+    transient boolean created = false;
+    transient static Producer<String, byte[]> producer;
+    transient static ExecutorService pool;
+    String topicName;
+
+    static {
+        KAFKA_MAPPING = new HashMap<>();
+        KAFKA_MAPPING.put("10.4.4.32", "10.4.4.30");
+        KAFKA_MAPPING.put("10.4.4.35", "10.4.4.31");
+        KAFKA_MAPPING.put("10.4.4.33", "10.4.4.22");
+        KAFKA_MAPPING.put("10.4.4.25", "10.4.4.26");
+        KAFKA_MAPPING.put("10.4.4.36", "10.4.4.27");
+        KAFKA_MAPPING.put("10.4.4.23", "10.4.4.48");
+        KAFKA_MAPPING.put("10.4.4.34", "10.4.4.70");
+        KAFKA_MAPPING.put("10.4.4.29", "10.4.4.46");
+        KAFKA_MAPPING.put("10.4.4.28", "10.4.4.41");
+        KAFKA_MAPPING.put("10.4.4.24", "10.4.4.37");
+        KAFKA_MAPPING.put("127.0.0.1", "10.4.4.30");
+        KAFKA_MAPPING.put("192.168.182.1", "10.4.4.30");
+    }
+
+    //private transient
+    public void create(){
+        //Assign topicName to string variable
+        this.topicName = "rheem_debug";
+        if(producer != null){
+            return;
+        }
+        String ip;
+        String hostname;
+        String id_machine;
+        try {
+            InetAddress info_machine = InetAddress.getLocalHost();
+            ip = info_machine.getHostAddress();
+            hostname = info_machine.getHostName();
+            id_machine = hostname.substring(hostname.length()-2);
+        }catch (UnknownHostException e){
+            //TODO: modified for the master of the servers
+            ip = "127.0.0.1";
+            hostname = "localhost";
+            id_machine = String.valueOf( (new Random()).nextInt(1000) );
+        }
+
+        // create instance for properties to access producer configs
+        Properties props = new Properties();
+
+        //Assign localhost id
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:9092", getBroker(ip)));
+        //props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-com.qcri.hackit");
+        //Set acknowledgements for producer requests.
+        props.put(ProducerConfig.ACKS_CONFIG,  "1");
+
+        //If the request fails, the producer can automatically retry,
+        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 12);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1);
+
+
+        //Specify buffer size in config
+        props.put("batch.size", 1);
+        //Reduce the no of requests less than 0
+
+        //The buffer.memory controls the total amount of memory available to the producer for buffering.
+        props.put("buffer.memory", 33554432);
+
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+
+        producer = new KafkaProducer<String, byte[]>(props);
+        pool = Executors.newFixedThreadPool(5);
+
+    }
+
+    private String getBroker(String ip){
+        return KAFKA_MAPPING.get(ip);
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public void send(T value) {
+        if( ! this.created ){
+            this.create();
+            this.created = true;
+        }
+        //System.out.println("sending");
+        //final byte[] tmp = SerializationUtils.serialize((Serializable) value);
+        pool.execute(
+                () -> {
+                    producer.send(
+                            new ProducerRecord<String, byte[]>(
+                                    topicName,
+                                    null,
+                                    SerializationUtils.serialize((Serializable) value)
+                            )
+                    );
+                }
+        );
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/resources/kafka-config.properties b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/resources/kafka-config.properties
new file mode 100644
index 0000000..e69de29