You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2021/09/29 10:12:29 UTC
[incubator-wayang] 03/03: [WAYANG-28] Shipper and Receiver with
Rabbit and Kafka
This is an automated email from the ASF dual-hosted git repository.
rpardomeza pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit df77465177aedf9917a9291e21ded656288d5c54
Author: rodrigopardomeza <ro...@gmail.com>
AuthorDate: Wed Sep 29 12:11:56 2021 +0200
[WAYANG-28] Shipper and Receiver with Rabbit and Kafka
---
wayang-plugins/pom.xml | 1 +
.../hackit-shipper-rabbitmq/pom.xml | 31 +++++
.../rabbitmq/HachitShipperDirectRabbitMQ.java | 86 ++++++++++++++
.../shipper/rabbitmq/HachitShipperRabbitMQ.java | 4 +
.../receiver/ReceiverMultiChannelRabbitMQ.java | 116 ++++++++++++++++++
.../sender/SenderMultiChannelRabbitMQ.java | 76 ++++++++++++
.../src/main/resources/rabbitmq-config.properties | 5 +
.../wayang-hackit/wayang-hackit-shipper/pom.xml | 1 -
.../wayang-hackit-shipper-kafka/pom.xml | 16 +++
.../hackit/shipper/kafka/sender/KafkaHackit.java | 132 +++++++++++++++++++++
.../src/main/resources/kafka-config.properties | 0
11 files changed, 467 insertions(+), 1 deletion(-)
diff --git a/wayang-plugins/pom.xml b/wayang-plugins/pom.xml
index 90bd0d7..1341a7e 100644
--- a/wayang-plugins/pom.xml
+++ b/wayang-plugins/pom.xml
@@ -41,6 +41,7 @@
<modules>
<module>wayang-iejoin</module>
<module>wayang-hackit</module>
+ <module>wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq</module>
</modules>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml
new file mode 100644
index 0000000..b3220bd
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>wayang-plugins</artifactId>
+ <groupId>org.apache.wayang</groupId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hackit-shipper-rabbitmq</artifactId>
+
+ <properties>
+ <rabbitmq.version>5.7.3</rabbitmq.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-hackit-core</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java
new file mode 100644
index 0000000..046fcd7
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperDirectRabbitMQ.java
@@ -0,0 +1,86 @@
+package org.apache.wayang.hackit.shipper.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.wayang.hackit.shipper.rabbitmq.receiver.ReceiverMultiChannelRabbitMQ;
+import org.apache.wayang.hackit.shipper.rabbitmq.sender.SenderMultiChannelRabbitMQ;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.Shipper;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/** Direct is faster but there is not multiple categories of topics, just a word
+ * In other words you publish based on a word, all the consumers who listen that word will fill the message in their queues
+ * */
+public class HachitShipperDirectRabbitMQ<K, T, ST, HackSender extends Sender<ST>, HackReceiver
+ extends Receiver<HackitTuple<K, T>>>
+ extends Shipper<HackitTuple<K, T>, ST, HackSender, HackReceiver> {
+
+ private transient ConnectionFactory connectionFactory = null;
+
+ /** Consumer Info */
+ private Connection consumeConnection;
+ private Channel consumeChannel;
+ private String consumeExchangeName;
+ private String queueName;
+
+
+ @Override
+ protected HackSender createSenderInstance() {
+ return (HackSender) new SenderMultiChannelRabbitMQ(this.connect());
+ }
+
+ @Override
+ protected HackReceiver createReceiverInstance() {
+ return (HackReceiver) new ReceiverMultiChannelRabbitMQ(this.connect());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public HackitTuple<K, T> next() {
+ return null;
+ }
+
+ public Connection connect(){
+ if(this.connectionFactory == null){
+ Properties prop = new Properties();
+ InputStream is = null;
+
+ try {
+ //is = new FileInputStream("../../resources/rabbitmq-config.properties");
+ is = new FileInputStream("C:\\Users\\Admin\\Desktop\\hackit\\hackit-shipper\\hackit-shipper-rabbitmq\\src\\main\\resources\\rabbitmq-config.properties");
+ prop.load(is);
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ this.connectionFactory= new ConnectionFactory();
+ this.connectionFactory.setUsername(prop.getProperty("username"));
+ this.connectionFactory.setPassword(prop.getProperty("password"));
+ this.connectionFactory.setVirtualHost(prop.getProperty("virtualhost"));
+ this.connectionFactory.setHost(prop.getProperty("host"));
+ this.connectionFactory.setPort(Integer.parseInt(prop.getProperty("port")));
+ }
+
+ try {
+ return this.connectionFactory.newConnection();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java
new file mode 100644
index 0000000..93ba2b1
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/HachitShipperRabbitMQ.java
@@ -0,0 +1,4 @@
+package org.apache.wayang.hackit.shipper.rabbitmq;
+
+public class HachitShipperRabbitMQ {
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java
new file mode 100644
index 0000000..bff40cb
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/receiver/ReceiverMultiChannelRabbitMQ.java
@@ -0,0 +1,116 @@
+package org.apache.wayang.hackit.shipper.rabbitmq.receiver;
+
+import com.rabbitmq.client.*;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.receiver.Receiver;
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class ReceiverMultiChannelRabbitMQ<K, T> extends Receiver<HackitTuple<K, T>> implements PSProtocol {
+
+ private transient Connection connection;
+ private transient Channel channel;
+ private transient Thread thread_collecting;
+ private transient ArrayList<HackitTuple<K, T>> collection;
+
+ /** Default values */
+ private String exchange_name = "default_consumer";
+ private String topic_name = "default_consumer";
+ private String queue_name= "";
+
+
+ public ReceiverMultiChannelRabbitMQ(Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void init() {
+ try {
+ this.channel = this.connection.createChannel();
+ this.queue_name = this.channel.queueDeclare().getQueue();
+ System.out.println(this.queue_name);
+ this.collection = new ArrayList<>();
+
+ final ReceiverMultiChannelRabbitMQ<K, T> thos = this;
+ this.thread_collecting = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ channel.basicConsume(queue_name, true, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException
+ {
+ HackitTuple<K, T> elem = SerializationUtils.deserialize(body);
+ thos.addElement(elem);
+ }
+ });
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ this.thread_collecting.run();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Iterator<HackitTuple<K, T>> getElements() {
+ return this._getElements();
+ }
+
+ @Override
+ public void close() {
+ try {
+ this.thread_collecting.stop();
+ this.channel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public PSProtocol addTopic(String... topic) {
+ if(topic.length > 1) {
+ this.topic_name = Arrays.stream(topic).collect(Collectors.joining("."));
+ }else {
+ this.topic_name = topic[0];
+ }
+ return this;
+ }
+
+ @Override
+ public PSProtocol addExchange(String exchange) {
+ this.exchange_name = exchange;
+ return this;
+ }
+
+ public synchronized void addElement(HackitTuple<K, T> element){
+ this.collection.add(element);
+ }
+
+ private synchronized Iterator<HackitTuple<K, T>> _getElements(){
+ if(this.collection.size() == 0){
+ return Collections.emptyIterator();
+ }
+ Iterator<HackitTuple<K, T>> tmp = this.collection.iterator();
+ this.collection = new ArrayList<>();
+ return tmp;
+ }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java
new file mode 100644
index 0000000..3a098dd
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/java/org/apache/wayang/hackit/shipper/rabbitmq/sender/SenderMultiChannelRabbitMQ.java
@@ -0,0 +1,76 @@
+package org.apache.wayang.hackit.shipper.rabbitmq.sender;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.PSProtocol;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class SenderMultiChannelRabbitMQ<T> implements Sender<T>, PSProtocol {
+ private Connection connection;
+ private Channel channel;
+
+ /** Default values */
+ private String exchange_name = "default";
+ private String topic_name = "default";
+
+ public SenderMultiChannelRabbitMQ(Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void init() {
+ try {
+ this.channel = connection.createChannel();
+ channel.exchangeDeclare(exchange_name, "direct");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void send(T value) {
+ try {
+ this.channel.basicPublish(
+ this.exchange_name,
+ this.topic_name,
+ null,
+ SerializationUtils.serialize((Serializable) value)
+ );
+ } catch (IOException e) {
+ e.printStackTrace();
+ } }
+
+ @Override
+ public void close() {
+ try {
+ this.channel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public PSProtocol addTopic(String... topic) {
+ if(topic.length > 1) {
+ this.topic_name = Arrays.stream(topic).collect(Collectors.joining("."));
+ }else {
+ this.topic_name = topic[0];
+ }
+ return this;
+ }
+
+ @Override
+ public PSProtocol addExchange(String exchange) {
+ this.exchange_name = exchange;
+ return this;
+ }
+}
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties
new file mode 100644
index 0000000..c326cdc
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/hackit-shipper-rabbitmq/src/main/resources/rabbitmq-config.properties
@@ -0,0 +1,5 @@
+username=admin
+password=admin
+virtualhost=/
+host=10.4.4.43
+port=50003
\ No newline at end of file
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
index d9ac095..f3a7233 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/pom.xml
@@ -33,5 +33,4 @@
<module>wayang-hackit-shipper-kafka</module>
</modules>
-
</project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
index 16c4d24..0ad5234 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/pom.xml
@@ -28,5 +28,21 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>wayang-hackit-shipper-kafka</artifactId>
+ <properties>
+ <kafka.version>2.3.0</kafka.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-hackit-core</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java
new file mode 100644
index 0000000..6a3e1c9
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/java/org/apache/wayang/hackit/shipper/kafka/sender/KafkaHackit.java
@@ -0,0 +1,132 @@
+package org.apache.wayang.hackit.shipper.kafka.sender;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.wayang.plugin.hackit.core.sniffer.shipper.sender.Sender;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class KafkaHackit<T> implements Sender<T> {
+
+ static Map<String, String> KAFKA_MAPPING;
+ transient boolean created = false;
+ transient static Producer<String, byte[]> producer;
+ transient static ExecutorService pool;
+ String topicName;
+
+ static {
+ KAFKA_MAPPING = new HashMap<>();
+ KAFKA_MAPPING.put("10.4.4.32", "10.4.4.30");
+ KAFKA_MAPPING.put("10.4.4.35", "10.4.4.31");
+ KAFKA_MAPPING.put("10.4.4.33", "10.4.4.22");
+ KAFKA_MAPPING.put("10.4.4.25", "10.4.4.26");
+ KAFKA_MAPPING.put("10.4.4.36", "10.4.4.27");
+ KAFKA_MAPPING.put("10.4.4.23", "10.4.4.48");
+ KAFKA_MAPPING.put("10.4.4.34", "10.4.4.70");
+ KAFKA_MAPPING.put("10.4.4.29", "10.4.4.46");
+ KAFKA_MAPPING.put("10.4.4.28", "10.4.4.41");
+ KAFKA_MAPPING.put("10.4.4.24", "10.4.4.37");
+ KAFKA_MAPPING.put("127.0.0.1", "10.4.4.30");
+ KAFKA_MAPPING.put("192.168.182.1", "10.4.4.30");
+ }
+
+ //private transient
+ public void create(){
+ //Assign topicName to string variable
+ this.topicName = "rheem_debug";
+ if(producer != null){
+ return;
+ }
+ String ip;
+ String hostname;
+ String id_machine;
+ try {
+ InetAddress info_machine = InetAddress.getLocalHost();
+ ip = info_machine.getHostAddress();
+ hostname = info_machine.getHostName();
+ id_machine = hostname.substring(hostname.length()-2);
+ }catch (UnknownHostException e){
+ //TODO: modified for the master of the servers
+ ip = "127.0.0.1";
+ hostname = "localhost";
+ id_machine = String.valueOf( (new Random()).nextInt(1000) );
+ }
+
+ // create instance for properties to access producer configs
+ Properties props = new Properties();
+
+ //Assign localhost id
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:9092", getBroker(ip)));
+ //props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-com.qcri.hackit");
+ //Set acknowledgements for producer requests.
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+
+ //If the request fails, the producer can automatically retry,
+ props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 12);
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
+ props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1);
+
+
+ //Specify buffer size in config
+ props.put("batch.size", 1);
+ //Reduce the no of requests less than 0
+
+ //The buffer.memory controls the total amount of memory available to the producer for buffering.
+ props.put("buffer.memory", 33554432);
+
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+
+ producer = new KafkaProducer<String, byte[]>(props);
+ pool = Executors.newFixedThreadPool(5);
+
+ }
+
+ private String getBroker(String ip){
+ return KAFKA_MAPPING.get(ip);
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void send(T value) {
+ if( ! this.created ){
+ this.create();
+ this.created = true;
+ }
+ //System.out.println("sending");
+ //final byte[] tmp = SerializationUtils.serialize((Serializable) value);
+ pool.execute(
+ () -> {
+ producer.send(
+ new ProducerRecord<String, byte[]>(
+ topicName,
+ null,
+ SerializationUtils.serialize((Serializable) value)
+ )
+ );
+ }
+ );
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/resources/kafka-config.properties b/wayang-plugins/wayang-hackit/wayang-hackit-shipper/wayang-hackit-shipper-kafka/src/main/resources/kafka-config.properties
new file mode 100644
index 0000000..e69de29