You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/01 03:23:05 UTC

[iotdb] branch master updated: Add Apache Pulsar to IoTDB example (#2122)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8657b13  Add Apache Pulsar to IoTDB example (#2122)
8657b13 is described below

commit 8657b13daffe4f41e576a628a4e4f5d5607341a7
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Dec 1 11:22:45 2020 +0800

    Add Apache Pulsar to IoTDB example (#2122)
---
 .../main/java/org/apache/iotdb/kafka/Constant.java |   2 +
 example/pom.xml                                    |   1 +
 example/pulsar/pom.xml                             |  45 +++++++++
 .../java/org/apache/iotdb/pulsar/Constant.java     |  30 ++++++
 .../org/apache/iotdb/pulsar/PulsarConsumer.java    | 108 +++++++++++++++++++++
 .../apache/iotdb/pulsar/PulsarConsumerThread.java  |  70 +++++++++++++
 .../org/apache/iotdb/pulsar/PulsarProducer.java    |  82 ++++++++++++++++
 7 files changed, 338 insertions(+)

diff --git a/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java b/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
index 4901eec..070bc08 100644
--- a/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
+++ b/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.kafka;
 import org.apache.iotdb.jdbc.Config;
 
 public class Constant {
+  private Constant() {
+  }
 
   public static final String TOPIC = "Kafka-Test";
   public static final int CONSUMER_THREAD_NUM = 5;
diff --git a/example/pom.xml b/example/pom.xml
index 8cd9c4f..e108400 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -42,6 +42,7 @@
         <module>hadoop</module>
         <module>flink</module>
         <module>mqtt</module>
+        <module>pulsar</module>
     </modules>
     <build>
         <pluginManagement>
diff --git a/example/pulsar/pom.xml b/example/pulsar/pom.xml
new file mode 100644
index 0000000..cb329a2
--- /dev/null
+++ b/example/pulsar/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-examples</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>pulsar-example</artifactId>
+    <name>IoTDB-Pulsar Examples</name>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <version>2.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java
new file mode 100644
index 0000000..83a1bf7
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+@SuppressWarnings("squid:S2068")
+public class Constant {
+  private Constant() {
+  }
+  public static final String TOPIC_NAME = "persistent://public/default/regions-partitioned";
+  public static final String IOTDB_CONNECTION_URL = "jdbc:iotdb://localhost:6667/";
+  public static final String IOTDB_CONNECTION_USER = "root";
+  public static final String IOTDB_CONNECTION_PASSWORD = "root";
+  public static final String STORAGE_GROUP = "root.vehicle";
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java
new file mode 100644
index 0000000..b67d46a
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumer {
+  private static final String SERVICE_URL = "pulsar://localhost:6650";
+  // Specify the number of consumers
+  private static final int CONSUMER_NUM = 3;
+  private List<Consumer<?>> consumerList;
+  private static final String CREATE_SG_TEMPLATE = "SET STORAGE GROUP TO %s";
+  private static final String CREATE_TIMESERIES_TEMPLATE = "CREATE TIMESERIES %s WITH DATATYPE=TEXT, ENCODING=PLAIN";
+  private static final Logger logger = LoggerFactory.getLogger(PulsarConsumer.class);
+  protected static final String[] ALL_TIMESERIES = {
+      "root.vehicle.device1.sensor1",
+      "root.vehicle.device1.sensor2",
+      "root.vehicle.device2.sensor1",
+      "root.vehicle.device2.sensor2",
+      "root.vehicle.device3.sensor1",
+      "root.vehicle.device3.sensor2",
+  };
+
+  public PulsarConsumer(List<Consumer<?>> consumerList) {
+    this.consumerList = consumerList;
+  }
+
+  public void consumeInParallel() throws ClassNotFoundException {
+    ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
+    for (int i = 0; i < consumerList.size(); i++) {
+      PulsarConsumerThread consumerExecutor = new PulsarConsumerThread(consumerList.get(i));
+      executor.submit(consumerExecutor);
+    }
+  }
+
+  @SuppressWarnings("squid:S2068")
+  private static void prepareSchema() {
+    try {
+      Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+      try (Connection connection = DriverManager
+          .getConnection(Constant.IOTDB_CONNECTION_URL, Constant.IOTDB_CONNECTION_USER,
+              Constant.IOTDB_CONNECTION_PASSWORD);
+           Statement statement = connection.createStatement()) {
+
+        statement.execute(String.format(CREATE_SG_TEMPLATE, Constant.STORAGE_GROUP));
+
+        for (String timeseries : ALL_TIMESERIES) {
+          statement.addBatch(String.format(CREATE_TIMESERIES_TEMPLATE, timeseries));
+        }
+        statement.executeBatch();
+        statement.clearBatch();
+      }
+    } catch (ClassNotFoundException | SQLException e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  public static void main(String[] args) throws PulsarClientException, ClassNotFoundException {
+    PulsarClient client = PulsarClient.builder()
+        .serviceUrl(SERVICE_URL)
+        .build();
+
+    List<Consumer<?>> consumerList = new ArrayList<>();
+    for (int i = 0; i < CONSUMER_NUM; i++) {
+      // In shared subscription mode, multiple consumers can attach to the same subscription
+      // and message are delivered in a round robin distribution across consumers.
+      Consumer<byte[]> consumer = client.newConsumer()
+          .topic(Constant.TOPIC_NAME)
+          .subscriptionName("shared-subscription")
+          .subscriptionType(SubscriptionType.Key_Shared).keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+          .subscribe();
+      consumerList.add(consumer);
+    }
+    PulsarConsumer pulsarConsumer = new PulsarConsumer(consumerList);
+    prepareSchema();
+    pulsarConsumer.consumeInParallel();
+  }
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java
new file mode 100644
index 0000000..ff5d17c
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerThread implements Runnable {
+  private static final String INSERT_TEMPLATE = "INSERT INTO root.vehicle.%s(timestamp,%s) VALUES (%s,'%s')";
+
+  private static final Logger logger = LoggerFactory.getLogger(PulsarConsumerThread.class);
+
+  private final Consumer<?> consumer;
+
+  public PulsarConsumerThread(Consumer<?> consumer) throws ClassNotFoundException {
+    this.consumer = consumer;
+    Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+  }
+
+  /**
+   * Write data to IoTDB
+   */
+  private void writeData(Statement statement, String message) throws SQLException {
+
+    String[] items = message.split(",");
+
+    String sql = String.format(INSERT_TEMPLATE, items[0], items[1], items[2], items[3]);
+    statement.execute(sql);
+  }
+
+  @SuppressWarnings("squid:S2068")
+  @Override
+  public void run() {
+    try (Connection connection = DriverManager
+        .getConnection(Constant.IOTDB_CONNECTION_URL, Constant.IOTDB_CONNECTION_USER,
+            Constant.IOTDB_CONNECTION_PASSWORD);
+         Statement statement = connection.createStatement()) {
+      do {
+        Message<?> msg = consumer.receive();
+        writeData(statement, new String(msg.getData()));
+
+        consumer.acknowledge(msg);
+      } while (true);
+    } catch (Exception e) {
+      logger.error(e.getMessage());
+    }
+  }
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java
new file mode 100644
index 0000000..e72b0e1
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+
+public class PulsarProducer {
+  private static final String SERVICE_URL = "pulsar://localhost:6650";
+  private final Producer<String> producer ;
+  private final PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
+  protected static final String[] ALL_DATA = {
+      "device1,sensor1,2017/10/24 19:30:00,606162908",
+      "device1,sensor2,2017/10/24 19:30:00,160161162",
+      "device2,sensor1,2017/10/24 19:30:00,360361362",
+      "device2,sensor2,2017/10/24 19:30:00,818182346",
+      "device3,sensor1,2017/10/24 19:30:00,296150221",
+      "device3,sensor2,2017/10/24 19:30:00,360361362",
+      "device1,sensor1,2017/10/24 19:31:00,752187168",
+      "device1,sensor2,2017/10/24 19:31:00,201286412",
+      "device2,sensor1,2017/10/24 19:31:00,280281282",
+      "device2,sensor2,2017/10/24 19:31:00,868159192",
+      "device3,sensor1,2017/10/24 19:31:00,260261262",
+      "device3,sensor2,2017/10/24 19:31:00,380381382",
+      "device1,sensor1,2017/10/24 19:32:00,505152421",
+      "device1,sensor2,2017/10/24 19:32:00,150151152",
+      "device2,sensor1,2017/10/24 19:32:00,250251252",
+      "device2,sensor2,2017/10/24 19:32:00,350351352",
+      "device3,sensor1,2017/10/24 19:32:00,404142234",
+      "device3,sensor2,2017/10/24 19:32:00,140141142",
+      "deivce1,sensor1,2017/10/24 19:33:00,240241242",
+      "device1,sensor2,2017/10/24 19:33:00,340341342",
+      "device2,sensor1,2017/10/24 19:33:00,404142234",
+      "device2,sensor2,2017/10/24 19:33:00,140141142",
+      "device3,sensor1,2017/10/24 19:33:00,957190242",
+      "device3,sensor2,2017/10/24 19:33:00,521216677",
+      "device1,sensor1,2017/10/24 19:34:00,101112567",
+      "device1,sensor2,2017/10/24 19:34:00,110111112",
+      "device2,sensor1,2017/10/24 19:34:00,615126321",
+      "device2,sensor2,2017/10/24 19:34:00,350351352",
+      "device3,sensor1,2017/10/24 19:34:00,122618247",
+      "device3,sensor2,2017/10/24 19:34:00,782148991"
+  };
+
+  public PulsarProducer() throws PulsarClientException {
+    this.producer = client.newProducer(new StringSchema()).
+        topic(Constant.TOPIC_NAME).batcherBuilder(BatcherBuilder.KEY_BASED).
+        hashingScheme(HashingScheme.Murmur3_32Hash).create();
+  }
+
+  public void produce() throws PulsarClientException {
+    for (String s : ALL_DATA) {
+      String[] line = s.split(",");
+      producer.newMessage().key(line[0]).value(s).send();
+    }
+  }
+
+  public static void main(String[] args) throws PulsarClientException {
+    PulsarProducer pulsarProducer = new PulsarProducer();
+    pulsarProducer.produce();
+  }
+}