You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/02 12:38:44 UTC

[iotdb] branch master updated: Fix flink-iotdb set storage group bug (#2165)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e95cebc  Fix flink-iotdb set storage group bug (#2165)
e95cebc is described below

commit e95cebc5041cb1dc85ec8d7823e73af6c7e518a4
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 2 20:38:27 2020 +0800

    Fix flink-iotdb set storage group bug (#2165)
---
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index fa0c1db..aa99378 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -19,19 +19,24 @@
 package org.apache.iotdb.flink;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By default send only one
  * event after another, but you can change to batch by invoking `withBatchSize(int)`.
@@ -74,7 +79,15 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(),
         options.getPassword(), sessionPoolSize);
 
-    pool.setStorageGroup(options.getStorageGroup());
+    try {
+      pool.setStorageGroup(options.getStorageGroup());
+    }
+    catch (StatementExecutionException e){
+      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()){
+        throw e;
+      }
+    }
+    
     for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
       if (!pool.checkTimeseriesExists(option.getPath())) {
         pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(),