You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2021/07/14 08:36:30 UTC

[iotdb] 01/01: path already exist

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch IOTDB-1499
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 538addb7b8ae7fd7d7ff3a69b904c3c211893e4d
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Wed Jul 14 16:35:14 2021 +0800

    path already exist
---
 .../src/main/java/org/apache/iotdb/flink/IoTDBSink.java     | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
index eaba61d..4b42bd8 100644
--- a/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+++ b/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
@@ -90,15 +90,22 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
     try {
       pool.setStorageGroup(options.getStorageGroup());
     } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+      if (e.getStatusCode() != (TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())) {
         throw e;
       }
     }
 
     for (IoTDBSinkOptions.TimeseriesOption option : options.getTimeseriesOptionList()) {
       if (!pool.checkTimeseriesExists(option.getPath())) {
-        pool.createTimeseries(
-            option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
+        try {
+          pool.createTimeseries(
+              option.getPath(), option.getDataType(), option.getEncoding(), option.getCompressor());
+        } catch (StatementExecutionException e) {
+          // path could have been created by the other process here
+          if (e.getStatusCode() != (TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())) {
+            throw e;
+          }
+        }
       }
     }
   }