You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/03 12:40:40 UTC

[iotdb] 01/01: fix flink iotdb example for writing data with incorrect data types

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix_flink_example
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8484105ea32342f534dea7f0f2005e93e404f3fb
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Dec 3 20:40:17 2020 +0800

    fix flink iotdb example for writing data with incorrect data types
---
 example/flink/pom.xml                                   |  5 +++++
 .../java/org/apache/iotdb/flink/FlinkIoTDBSink.java     | 17 +++++++++++++----
 pom.xml                                                 |  1 +
 3 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index 398c789..6c9277b 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -54,5 +54,10 @@
             <artifactId>flink-tsfile-connector</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.library.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 35e2143..87b06a6 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -25,6 +25,9 @@ import java.security.SecureRandom;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
 public class FlinkIoTDBSink {
     public static void main(String[] args) throws Exception {
@@ -37,19 +40,25 @@ public class FlinkIoTDBSink {
         options.setUser("root");
         options.setPassword("root");
         options.setStorageGroup("root.sg");
-        options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.d1.s1")));
+
+        //If the server enables auto_create_schema, then we do not need to register all timeseries here.
+        options.setTimeseriesOptionList(
+            Lists.newArrayList(new IoTDBOptions.TimeseriesOption(
+                "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY
+                )));
 
         IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
         IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
                 // enable batching
-                .withBatchSize(10);
+                .withBatchSize(10)
+                // how many connectons to the server will be created for each parallelism
+                .withSessionPoolSize(3);
 
         env.addSource(new SensorSource())
                 .name("sensor-source")
                 .setParallelism(1)
                 .addSink(ioTDBSink)
-                .name("iotdb-sink")
-                .setParallelism(1);
+                .name("iotdb-sink");
 
         env.execute("iotdb-flink-example");
     }
diff --git a/pom.xml b/pom.xml
index 1cf788d..11e035e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <maven.assembly.version>3.1.0</maven.assembly.version>
+        <scala.library.version>2.11</scala.library.version>
         <scala.version>2.11.12</scala.version>
         <hadoop2.version>2.7.3</hadoop2.version>
         <hive2.version>2.3.6</hive2.version>