You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/01/11 14:15:29 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e24843515 [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)
e24843515 is described below
commit e24843515ffdbcd58a54f1306987cf36d4bb370d
Author: ChunFuWu <31...@qq.com>
AuthorDate: Wed Jan 11 22:15:22 2023 +0800
[Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)
* [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
* [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
* [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector
Co-authored-by: Tyrantlucifer <Ty...@gmail.com>
---
.../connector-v2/Error-Quick-Reference-Manual.md | 8 ++++
.../seatunnel/iceberg/IcebergCatalogFactory.java | 5 ++-
.../iceberg/data/DefaultDeserializer.java | 6 ++-
.../seatunnel/iceberg/data/IcebergTypeMapper.java | 11 ++++--
.../exception/IcebergConnectorErrorCode.java | 44 ++++++++++++++++++++++
.../exception/IcebergConnectorException.java | 36 ++++++++++++++++++
.../enumerator/scan/IcebergScanSplitPlanner.java | 13 +++++--
.../source/reader/IcebergFileScanTaskReader.java | 12 ++++--
.../reader/IcebergFileScanTaskSplitReader.java | 10 +++--
9 files changed, 130 insertions(+), 15 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index b684779b9..37580705d 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -206,8 +206,16 @@ problems encountered by users.
| DINGTALK-01 | Send response to DinkTalk server failed | When users encounter this error code, it means that send response message to DinkTalk server failed, please check it |
| DINGTALK-02 | Get sign from DinkTalk server failed | When users encounter this error code, it means that get signature from DinkTalk server failed , please check it |
+## Iceberg Connector Error Codes
+
+| code | description | solution |
+|------------|---------------------------------|----------------------------------------------------------------------------------------------------------|
+| ICEBERG-01 | File Scan Split failed | When users encounter this error code, it means that the file scanning and splitting failed. Please check |
+| ICEBERG-02 | Invalid starting record offset | When users encounter this error code, it means that the starting record offset is invalid. Please check |
+
## Email Connector Error Codes
| code | description | solution |
|-------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| EMAIL-01 | Send email failed | When users encounter this error code, it means that send email to target server failed, please adjust the network environment according to the abnormal information |
+
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
index 480c80bd8..50039eb00 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
@@ -64,7 +66,8 @@ public class IcebergCatalogFactory implements Serializable {
properties.put(CatalogProperties.URI, uri);
return hive(catalogName, serializableConf, properties);
default:
- throw new UnsupportedOperationException("Unsupported catalogType: " + catalogType);
+ throw new IcebergConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format("Unsupported catalogType: %s", catalogType));
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
index 7ae641d14..00e25f18b 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import lombok.AllArgsConstructor;
import lombok.NonNull;
@@ -133,7 +135,9 @@ public class DefaultDeserializer implements Deserializer {
}
return seatunnelMap;
default:
- throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType);
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format("Unsupported iceberg type: %s", icebergType));
}
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index 485ecaf88..b4893a86b 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import lombok.NonNull;
import org.apache.iceberg.types.Type;
@@ -68,7 +70,8 @@ public class IcebergTypeMapper {
case MAP:
return mappingMapType((Types.MapType) icebergType);
default:
- throw new UnsupportedOperationException(
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported iceberg data type: " + icebergType.typeId());
}
}
@@ -100,8 +103,10 @@ public class IcebergTypeMapper {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
default:
- throw new UnsupportedOperationException(
- "Unsupported iceberg list element type: " + listType.elementType().typeId());
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported iceberg list element type: " +
+ listType.elementType().typeId());
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
new file mode 100644
index 000000000..878162380
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum IcebergConnectorErrorCode implements SeaTunnelErrorCode {
+
+ FILE_SCAN_SPLIT_FAILED("ICEBERG-01", "File Scan Split failed"),
+ INVALID_STARTING_RECORD_OFFSET("ICEBERG-02", "Invalid starting record offset");
+
+ private final String code;
+ private final String description;
+
+ IcebergConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
new file mode 100644
index 000000000..0279c46b3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class IcebergConnectorException extends SeaTunnelRuntimeException {
+
+ public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
index f3d29f814..d41a5d813 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
@@ -19,6 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
@@ -36,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -135,8 +137,10 @@ public class IcebergScanSplitPlanner {
return Optional.of(SnapshotUtil.snapshotAfter(table, snapshotIdAsOfTime));
}
default:
- throw new UnsupportedOperationException("Unsupported stream scan strategy: " +
- icebergScanContext.getStreamScanStrategy());
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Unsupported stream scan strategy: " +
+ icebergScanContext.getStreamScanStrategy());
}
}
@@ -151,7 +155,8 @@ public class IcebergScanSplitPlanner {
}
return splits;
} catch (IOException e) {
- throw new UncheckedIOException(
+ throw new IcebergConnectorException(
+ IcebergConnectorErrorCode.FILE_SCAN_SPLIT_FAILED,
"Failed to scan iceberg splits from: " + table.name(), e);
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
index a50706847..8f1ddd884 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergRecordProjection;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import com.google.common.collect.Sets;
import lombok.Builder;
@@ -93,7 +95,9 @@ public class IcebergFileScanTaskReader implements Closeable {
private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProjection) {
if (task.isDataTask()) {
- throw new UnsupportedOperationException("Cannot read data task.");
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ "Cannot read data task.");
}
InputFile input = fileIO.newInputFile(task.file().path().toString());
Map<Integer, ?> partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
@@ -131,8 +135,10 @@ public class IcebergFileScanTaskReader implements Closeable {
.filter(task.residual());
return orc.build();
default:
- throw new UnsupportedOperationException(String.format("Cannot read %s file: %s",
- task.file().format().name(), task.file().path()));
+ throw new IcebergConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format("Cannot read %s file: %s",
+ task.file().format().name(), task.file().path()));
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
index c170dbd90..6667fc897 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import lombok.AllArgsConstructor;
@@ -38,7 +40,7 @@ public class IcebergFileScanTaskSplitReader implements Closeable {
public CloseableIterator<SeaTunnelRow> open(@NonNull IcebergFileScanTaskSplit split) {
CloseableIterator<Record> iterator = icebergFileScanTaskReader.open(split.getTask());
- OffsetSeekIterator<Record> seekIterator = new OffsetSeekIterator(iterator);
+ OffsetSeekIterator<Record> seekIterator = new OffsetSeekIterator<>(iterator);
seekIterator.seek(split.getRecordOffset());
return CloseableIterator.transform(seekIterator, record -> {
@@ -62,8 +64,10 @@ public class IcebergFileScanTaskSplitReader implements Closeable {
if (hasNext()) {
next();
} else {
- throw new IllegalStateException(String.format(
- "Invalid starting record offset %d", startingRecordOffset));
+ throw new IcebergConnectorException(
+ IcebergConnectorErrorCode.INVALID_STARTING_RECORD_OFFSET,
+ String.format(
+ "Invalid starting record offset %d", startingRecordOffset));
}
}
}