You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/01/11 14:15:29 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e24843515 [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)
e24843515 is described below

commit e24843515ffdbcd58a54f1306987cf36d4bb370d
Author: ChunFuWu <31...@qq.com>
AuthorDate: Wed Jan 11 22:15:22 2023 +0800

    [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
    
    Co-authored-by: Tyrantlucifer <Ty...@gmail.com>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  8 ++++
 .../seatunnel/iceberg/IcebergCatalogFactory.java   |  5 ++-
 .../iceberg/data/DefaultDeserializer.java          |  6 ++-
 .../seatunnel/iceberg/data/IcebergTypeMapper.java  | 11 ++++--
 .../exception/IcebergConnectorErrorCode.java       | 44 ++++++++++++++++++++++
 .../exception/IcebergConnectorException.java       | 36 ++++++++++++++++++
 .../enumerator/scan/IcebergScanSplitPlanner.java   | 13 +++++--
 .../source/reader/IcebergFileScanTaskReader.java   | 12 ++++--
 .../reader/IcebergFileScanTaskSplitReader.java     | 10 +++--
 9 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index b684779b9..37580705d 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -206,8 +206,16 @@ problems encountered by users.
 | DINGTALK-01 | Send response to DinkTalk server failed | When users encounter this error code, it means that send response message to DinkTalk server failed, please check it |
 | DINGTALK-02 | Get sign from DinkTalk server failed    | When users encounter this error code, it means that get signature from DinkTalk server failed , please check it      |
 
+## Iceberg Connector Error Codes
+
+| code       | description                     | solution                                                                                                 |
+|------------|---------------------------------|----------------------------------------------------------------------------------------------------------|
+| ICEBERG-01 | File Scan Split failed          | When users encounter this error code, it means that the file scanning and splitting failed. Please check |
+| ICEBERG-02 | Invalid starting record offset  | When users encounter this error code, it means that the starting record offset is invalid. Please check  |
+
 ## Email Connector Error Codes
 
 | code        | description       | solution                                                                                                                                                            |
 |-------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | EMAIL-01    | Send email failed | When users encounter this error code, it means that send email to target server failed, please adjust the network environment according to the abnormal information |
+
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
index 480c80bd8..50039eb00 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.NonNull;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +66,8 @@ public class IcebergCatalogFactory implements Serializable {
                 properties.put(CatalogProperties.URI, uri);
                 return hive(catalogName, serializableConf, properties);
             default:
-                throw new UnsupportedOperationException("Unsupported catalogType: " + catalogType);
+                throw new IcebergConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                    String.format("Unsupported catalogType: %s", catalogType));
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
index 7ae641d14..00e25f18b 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.AllArgsConstructor;
 import lombok.NonNull;
@@ -133,7 +135,9 @@ public class DefaultDeserializer implements Deserializer {
                 }
                 return seatunnelMap;
             default:
-                throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType);
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    String.format("Unsupported iceberg type: %s", icebergType));
         }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index 485ecaf88..b4893a86b 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.NonNull;
 import org.apache.iceberg.types.Type;
@@ -68,7 +70,8 @@ public class IcebergTypeMapper {
             case MAP:
                 return mappingMapType((Types.MapType) icebergType);
             default:
-                throw new UnsupportedOperationException(
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                     "Unsupported iceberg data type: " + icebergType.typeId());
         }
     }
@@ -100,8 +103,10 @@ public class IcebergTypeMapper {
             case STRING:
                 return ArrayType.STRING_ARRAY_TYPE;
             default:
-                throw new UnsupportedOperationException(
-                    "Unsupported iceberg list element type: " + listType.elementType().typeId());
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "Unsupported iceberg list element type: " +
+                        listType.elementType().typeId());
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
new file mode 100644
index 000000000..878162380
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum IcebergConnectorErrorCode implements SeaTunnelErrorCode {
+
+    FILE_SCAN_SPLIT_FAILED("ICEBERG-01", "File Scan Split failed"),
+    INVALID_STARTING_RECORD_OFFSET("ICEBERG-02", "Invalid starting record offset");
+
+    private final String code;
+    private final String description;
+
+    IcebergConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
new file mode 100644
index 000000000..0279c46b3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class IcebergConnectorException extends SeaTunnelRuntimeException {
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
index f3d29f814..d41a5d813 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
@@ -19,6 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
@@ -36,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.util.SnapshotUtil;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -135,8 +137,10 @@ public class IcebergScanSplitPlanner {
                     return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
                 }
             default:
-                throw new UnsupportedOperationException("Unsupported stream scan strategy: " +
-                    icebergScanContext.getStreamScanStrategy());
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_OPERATION,
+                    "Unsupported stream scan strategy: " +
+                        icebergScanContext.getStreamScanStrategy());
         }
     }
 
@@ -151,7 +155,8 @@ public class IcebergScanSplitPlanner {
             }
             return splits;
         } catch (IOException e) {
-            throw new UncheckedIOException(
+            throw new IcebergConnectorException(
+                IcebergConnectorErrorCode.FILE_SCAN_SPLIT_FAILED,
                 "Failed to scan iceberg splits from: " + table.name(), e);
         }
     }
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
index a50706847..8f1ddd884 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergRecordProjection;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import com.google.common.collect.Sets;
 import lombok.Builder;
@@ -93,7 +95,9 @@ public class IcebergFileScanTaskReader implements Closeable {
 
     private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProjection) {
         if (task.isDataTask()) {
-            throw new UnsupportedOperationException("Cannot read data task.");
+            throw new IcebergConnectorException(
+                CommonErrorCode.UNSUPPORTED_OPERATION,
+                "Cannot read data task.");
         }
         InputFile input = fileIO.newInputFile(task.file().path().toString());
         Map<Integer, ?> partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
@@ -131,8 +135,10 @@ public class IcebergFileScanTaskReader implements Closeable {
                     .filter(task.residual());
                 return orc.build();
             default:
-                throw new UnsupportedOperationException(String.format("Cannot read %s file: %s",
-                    task.file().format().name(), task.file().path()));
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_OPERATION,
+                    String.format("Cannot read %s file: %s",
+                        task.file().format().name(), task.file().path()));
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
index c170dbd90..6667fc897 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
 
 import lombok.AllArgsConstructor;
@@ -38,7 +40,7 @@ public class IcebergFileScanTaskSplitReader implements Closeable {
     public CloseableIterator<SeaTunnelRow> open(@NonNull IcebergFileScanTaskSplit split) {
         CloseableIterator<Record> iterator = icebergFileScanTaskReader.open(split.getTask());
 
-        OffsetSeekIterator<Record> seekIterator = new OffsetSeekIterator(iterator);
+        OffsetSeekIterator<Record> seekIterator = new OffsetSeekIterator<>(iterator);
         seekIterator.seek(split.getRecordOffset());
 
         return CloseableIterator.transform(seekIterator, record -> {
@@ -62,8 +64,10 @@ public class IcebergFileScanTaskSplitReader implements Closeable {
                 if (hasNext()) {
                     next();
                 } else {
-                    throw new IllegalStateException(String.format(
-                        "Invalid starting record offset %d", startingRecordOffset));
+                    throw new IcebergConnectorException(
+                        IcebergConnectorErrorCode.INVALID_STARTING_RECORD_OFFSET,
+                        String.format(
+                            "Invalid starting record offset %d", startingRecordOffset));
                 }
             }
         }