You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/01 03:23:05 UTC
[iotdb] branch master updated: Add Apache Pulsar to IoTDB example
(#2122)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8657b13 Add Apache Pulsar to IoTDB example (#2122)
8657b13 is described below
commit 8657b13daffe4f41e576a628a4e4f5d5607341a7
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Dec 1 11:22:45 2020 +0800
Add Apache Pulsar to IoTDB example (#2122)
---
.../main/java/org/apache/iotdb/kafka/Constant.java | 2 +
example/pom.xml | 1 +
example/pulsar/pom.xml | 45 +++++++++
.../java/org/apache/iotdb/pulsar/Constant.java | 30 ++++++
.../org/apache/iotdb/pulsar/PulsarConsumer.java | 108 +++++++++++++++++++++
.../apache/iotdb/pulsar/PulsarConsumerThread.java | 70 +++++++++++++
.../org/apache/iotdb/pulsar/PulsarProducer.java | 82 ++++++++++++++++
7 files changed, 338 insertions(+)
diff --git a/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java b/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
index 4901eec..070bc08 100644
--- a/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
+++ b/example/kafka/src/main/java/org/apache/iotdb/kafka/Constant.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.kafka;
import org.apache.iotdb.jdbc.Config;
public class Constant {
+ private Constant() {
+ }
public static final String TOPIC = "Kafka-Test";
public static final int CONSUMER_THREAD_NUM = 5;
diff --git a/example/pom.xml b/example/pom.xml
index 8cd9c4f..e108400 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -42,6 +42,7 @@
<module>hadoop</module>
<module>flink</module>
<module>mqtt</module>
+ <module>pulsar</module>
</modules>
<build>
<pluginManagement>
diff --git a/example/pulsar/pom.xml b/example/pulsar/pom.xml
new file mode 100644
index 0000000..cb329a2
--- /dev/null
+++ b/example/pulsar/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-examples</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pulsar-example</artifactId>
+ <name>IoTDB-Pulsar Examples</name>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java
new file mode 100644
index 0000000..83a1bf7
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/Constant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+@SuppressWarnings("squid:S2068")
+public class Constant {
+ private Constant() {
+ }
+ public static final String TOPIC_NAME = "persistent://public/default/regions-partitioned";
+ public static final String IOTDB_CONNECTION_URL = "jdbc:iotdb://localhost:6667/";
+ public static final String IOTDB_CONNECTION_USER = "root";
+ public static final String IOTDB_CONNECTION_PASSWORD = "root";
+ public static final String STORAGE_GROUP = "root.vehicle";
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java
new file mode 100644
index 0000000..b67d46a
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumer {
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ // Specify the number of consumers
+ private static final int CONSUMER_NUM = 3;
+ private List<Consumer<?>> consumerList;
+ private static final String CREATE_SG_TEMPLATE = "SET STORAGE GROUP TO %s";
+ private static final String CREATE_TIMESERIES_TEMPLATE = "CREATE TIMESERIES %s WITH DATATYPE=TEXT, ENCODING=PLAIN";
+ private static final Logger logger = LoggerFactory.getLogger(PulsarConsumer.class);
+ protected static final String[] ALL_TIMESERIES = {
+ "root.vehicle.device1.sensor1",
+ "root.vehicle.device1.sensor2",
+ "root.vehicle.device2.sensor1",
+ "root.vehicle.device2.sensor2",
+ "root.vehicle.device3.sensor1",
+ "root.vehicle.device3.sensor2",
+ };
+
+ public PulsarConsumer(List<Consumer<?>> consumerList) {
+ this.consumerList = consumerList;
+ }
+
+ public void consumeInParallel() throws ClassNotFoundException {
+ ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_NUM);
+ for (int i = 0; i < consumerList.size(); i++) {
+ PulsarConsumerThread consumerExecutor = new PulsarConsumerThread(consumerList.get(i));
+ executor.submit(consumerExecutor);
+ }
+ }
+
+ @SuppressWarnings("squid:S2068")
+ private static void prepareSchema() {
+ try {
+ Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+ try (Connection connection = DriverManager
+ .getConnection(Constant.IOTDB_CONNECTION_URL, Constant.IOTDB_CONNECTION_USER,
+ Constant.IOTDB_CONNECTION_PASSWORD);
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format(CREATE_SG_TEMPLATE, Constant.STORAGE_GROUP));
+
+ for (String timeseries : ALL_TIMESERIES) {
+ statement.addBatch(String.format(CREATE_TIMESERIES_TEMPLATE, timeseries));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ }
+ } catch (ClassNotFoundException | SQLException e) {
+ logger.error(e.getMessage());
+ }
+ }
+
+ public static void main(String[] args) throws PulsarClientException, ClassNotFoundException {
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(SERVICE_URL)
+ .build();
+
+ List<Consumer<?>> consumerList = new ArrayList<>();
+ for (int i = 0; i < CONSUMER_NUM; i++) {
+ // In shared subscription mode, multiple consumers can attach to the same subscription
+ // and message are delivered in a round robin distribution across consumers.
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(Constant.TOPIC_NAME)
+ .subscriptionName("shared-subscription")
+ .subscriptionType(SubscriptionType.Key_Shared).keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+ .subscribe();
+ consumerList.add(consumer);
+ }
+ PulsarConsumer pulsarConsumer = new PulsarConsumer(consumerList);
+ prepareSchema();
+ pulsarConsumer.consumeInParallel();
+ }
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java
new file mode 100644
index 0000000..ff5d17c
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarConsumerThread.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarConsumerThread implements Runnable {
+ private static final String INSERT_TEMPLATE = "INSERT INTO root.vehicle.%s(timestamp,%s) VALUES (%s,'%s')";
+
+ private static final Logger logger = LoggerFactory.getLogger(PulsarConsumerThread.class);
+
+ private final Consumer<?> consumer;
+
+ public PulsarConsumerThread(Consumer<?> consumer) throws ClassNotFoundException {
+ this.consumer = consumer;
+ Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
+ }
+
+ /**
+ * Write data to IoTDB
+ */
+ private void writeData(Statement statement, String message) throws SQLException {
+
+ String[] items = message.split(",");
+
+ String sql = String.format(INSERT_TEMPLATE, items[0], items[1], items[2], items[3]);
+ statement.execute(sql);
+ }
+
+ @SuppressWarnings("squid:S2068")
+ @Override
+ public void run() {
+ try (Connection connection = DriverManager
+ .getConnection(Constant.IOTDB_CONNECTION_URL, Constant.IOTDB_CONNECTION_USER,
+ Constant.IOTDB_CONNECTION_PASSWORD);
+ Statement statement = connection.createStatement()) {
+ do {
+ Message<?> msg = consumer.receive();
+ writeData(statement, new String(msg.getData()));
+
+ consumer.acknowledge(msg);
+ } while (true);
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+}
diff --git a/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java
new file mode 100644
index 0000000..e72b0e1
--- /dev/null
+++ b/example/pulsar/src/main/java/org/apache/iotdb/pulsar/PulsarProducer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.pulsar;
+
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+
+public class PulsarProducer {
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private final Producer<String> producer ;
+ private final PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
+ protected static final String[] ALL_DATA = {
+ "device1,sensor1,2017/10/24 19:30:00,606162908",
+ "device1,sensor2,2017/10/24 19:30:00,160161162",
+ "device2,sensor1,2017/10/24 19:30:00,360361362",
+ "device2,sensor2,2017/10/24 19:30:00,818182346",
+ "device3,sensor1,2017/10/24 19:30:00,296150221",
+ "device3,sensor2,2017/10/24 19:30:00,360361362",
+ "device1,sensor1,2017/10/24 19:31:00,752187168",
+ "device1,sensor2,2017/10/24 19:31:00,201286412",
+ "device2,sensor1,2017/10/24 19:31:00,280281282",
+ "device2,sensor2,2017/10/24 19:31:00,868159192",
+ "device3,sensor1,2017/10/24 19:31:00,260261262",
+ "device3,sensor2,2017/10/24 19:31:00,380381382",
+ "device1,sensor1,2017/10/24 19:32:00,505152421",
+ "device1,sensor2,2017/10/24 19:32:00,150151152",
+ "device2,sensor1,2017/10/24 19:32:00,250251252",
+ "device2,sensor2,2017/10/24 19:32:00,350351352",
+ "device3,sensor1,2017/10/24 19:32:00,404142234",
+ "device3,sensor2,2017/10/24 19:32:00,140141142",
+ "deivce1,sensor1,2017/10/24 19:33:00,240241242",
+ "device1,sensor2,2017/10/24 19:33:00,340341342",
+ "device2,sensor1,2017/10/24 19:33:00,404142234",
+ "device2,sensor2,2017/10/24 19:33:00,140141142",
+ "device3,sensor1,2017/10/24 19:33:00,957190242",
+ "device3,sensor2,2017/10/24 19:33:00,521216677",
+ "device1,sensor1,2017/10/24 19:34:00,101112567",
+ "device1,sensor2,2017/10/24 19:34:00,110111112",
+ "device2,sensor1,2017/10/24 19:34:00,615126321",
+ "device2,sensor2,2017/10/24 19:34:00,350351352",
+ "device3,sensor1,2017/10/24 19:34:00,122618247",
+ "device3,sensor2,2017/10/24 19:34:00,782148991"
+ };
+
+ public PulsarProducer() throws PulsarClientException {
+ this.producer = client.newProducer(new StringSchema()).
+ topic(Constant.TOPIC_NAME).batcherBuilder(BatcherBuilder.KEY_BASED).
+ hashingScheme(HashingScheme.Murmur3_32Hash).create();
+ }
+
+ public void produce() throws PulsarClientException {
+ for (String s : ALL_DATA) {
+ String[] line = s.split(",");
+ producer.newMessage().key(line[0]).value(s).send();
+ }
+ }
+
+ public static void main(String[] args) throws PulsarClientException {
+ PulsarProducer pulsarProducer = new PulsarProducer();
+ pulsarProducer.produce();
+ }
+}