You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/03 12:40:40 UTC
[iotdb] 01/01: fix flink iotdb example for writing data with
incorrect data types
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch fix_flink_example
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8484105ea32342f534dea7f0f2005e93e404f3fb
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Dec 3 20:40:17 2020 +0800
fix flink iotdb example for writing data with incorrect data types
---
example/flink/pom.xml | 5 +++++
.../java/org/apache/iotdb/flink/FlinkIoTDBSink.java | 17 +++++++++++++----
pom.xml | 1 +
3 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/example/flink/pom.xml b/example/flink/pom.xml
index 398c789..6c9277b 100644
--- a/example/flink/pom.xml
+++ b/example/flink/pom.xml
@@ -54,5 +54,10 @@
<artifactId>flink-tsfile-connector</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.library.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
index 35e2143..87b06a6 100644
--- a/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
+++ b/example/flink/src/main/java/org/apache/iotdb/flink/FlinkIoTDBSink.java
@@ -25,6 +25,9 @@ import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
public class FlinkIoTDBSink {
public static void main(String[] args) throws Exception {
@@ -37,19 +40,25 @@ public class FlinkIoTDBSink {
options.setUser("root");
options.setPassword("root");
options.setStorageGroup("root.sg");
- options.setTimeseriesOptionList(Lists.newArrayList(new IoTDBOptions.TimeseriesOption("root.sg.d1.s1")));
+
+ //If the server enables auto_create_schema, then we do not need to register all timeseries here.
+ options.setTimeseriesOptionList(
+ Lists.newArrayList(new IoTDBOptions.TimeseriesOption(
+ "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY
+ )));
IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema)
// enable batching
- .withBatchSize(10);
+ .withBatchSize(10)
+ // how many connectons to the server will be created for each parallelism
+ .withSessionPoolSize(3);
env.addSource(new SensorSource())
.name("sensor-source")
.setParallelism(1)
.addSink(ioTDBSink)
- .name("iotdb-sink")
- .setParallelism(1);
+ .name("iotdb-sink");
env.execute("iotdb-flink-example");
}
diff --git a/pom.xml b/pom.xml
index 1cf788d..11e035e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.assembly.version>3.1.0</maven.assembly.version>
+ <scala.library.version>2.11</scala.library.version>
<scala.version>2.11.12</scala.version>
<hadoop2.version>2.7.3</hadoop2.version>
<hive2.version>2.3.6</hive2.version>