You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/02 12:42:06 UTC
[iotdb] branch rel/0.11 updated: Fix flink-iotdb set storage group
bug (#2165)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new df3558c Fix flink-iotdb set storage group bug (#2165)
df3558c is described below
commit df3558c72b78a28f07e61c0f0692c7e70c680aff
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 2 20:38:27 2020 +0800
Fix flink-iotdb set storage group bug (#2165)
(cherry picked from commit e95cebc5041cb1dc85ec8d7823e73af6c7e518a4)
---
.../java/org/apache/iotdb/flink/IoTDBSink.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index fa0c1db..aa99378 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -19,19 +19,24 @@
package org.apache.iotdb.flink;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
/**
* The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By default send only one
* event after another, but you can change to batch by invoking `withBatchSize(int)`.
@@ -74,7 +79,15 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(),
options.getPassword(), sessionPoolSize);
- pool.setStorageGroup(options.getStorageGroup());
+ try {
+ pool.setStorageGroup(options.getStorageGroup());
+ }
+ catch (StatementExecutionException e){
+ if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()){
+ throw e;
+ }
+ }
+
for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
if (!pool.checkTimeseriesExists(option.getPath())) {
pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(),