You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2020/12/02 11:48:04 UTC

[iotdb] branch fix-flink-set-storage-group-bug created (now 43b4aa2)

This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a change to branch fix-flink-set-storage-group-bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 43b4aa2  fix bug

This branch includes the following new commits:

     new 43b4aa2  fix bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix bug

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch fix-flink-set-storage-group-bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43b4aa24b4ee845da0c60ebf1a966cbbc2eaa1e8
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Dec 2 19:47:31 2020 +0800

    fix bug
---
 .../java/org/apache/iotdb/flink/IoTDBSink.java     | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index fa0c1db..aa99378 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -19,19 +19,24 @@
 package org.apache.iotdb.flink;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The `IoTDBSink` allows flink jobs to write events into IoTDB timeseries. By default send only one
  * event after another, but you can change to batch by invoking `withBatchSize(int)`.
@@ -74,7 +79,15 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     pool = new SessionPool(options.getHost(), options.getPort(), options.getUser(),
         options.getPassword(), sessionPoolSize);
 
-    pool.setStorageGroup(options.getStorageGroup());
+    try {
+      pool.setStorageGroup(options.getStorageGroup());
+    }
+    catch (StatementExecutionException e){
+      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()){
+        throw e;
+      }
+    }
+    
     for (IoTDBOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
       if (!pool.checkTimeseriesExists(option.getPath())) {
         pool.createTimeseries(option.getPath(), option.getDataType(), option.getEncoding(),