You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/20 09:44:33 UTC
[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a8738ef3c [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
a8738ef3c is described below
commit a8738ef3c4f6686869d75b46aa6590d2e2503e33
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Oct 20 17:44:27 2022 +0800
[Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
[Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
---
docs/en/connector-v2/sink/Hive.md | 7 ++
.../hive/commit/HiveSinkAggregatedCommitter.java | 87 ++++++++++++++++++++++
.../seatunnel/hive/config/HiveConfig.java | 1 +
.../connectors/seatunnel/hive/sink/HiveSink.java | 55 +++++++-------
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 20 +++--
5 files changed, 134 insertions(+), 36 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md
index 19b287396..9fee56bc5 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -159,3 +159,10 @@ sink {
}
}
```
+
+## Changelog
+
+| Version | Date | Pull Request | Subject |
+|------------|------------|-----------------------------------------------------------------|-----------------------------------------------|
+| 2.2.0-beta | 2022-09-26 | | Add Hive Sink |
+| 2.3.0-beta | 2022-10-19 | [3133](https://github.com/apache/incubator-seatunnel/pull/3133) | Hive Sink supports automatic partition repair |
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
new file mode 100644
index 000000000..410d9ec8c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.commit;
+
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
+ private final Config pluginConfig;
+ private final String dbName;
+ private final String tableName;
+
+ public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, String tableName) {
+ this.pluginConfig = pluginConfig;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
+ HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
+ List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
+ if (errorCommitInfos.isEmpty()) {
+ for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
+ Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
+ List<String> partitions = partitionDirAndValuesMap.keySet().stream()
+ .map(partition -> partition.replaceAll("\\\\", "/"))
+ .collect(Collectors.toList());
+ try {
+ hiveMetaStore.addPartitions(dbName, tableName, partitions);
+ log.info("Add these partitions [{}]", partitions);
+ } catch (TException e) {
+ log.error("Failed to add these partitions [{}]", partitions);
+ errorCommitInfos.add(aggregatedCommitInfo);
+ }
+ }
+ }
+ hiveMetaStore.close();
+ return errorCommitInfos;
+ }
+
+ @Override
+ public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
+ super.abort(aggregatedCommitInfos);
+ HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
+ for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
+ Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
+ List<String> partitions = partitionDirAndValuesMap.keySet().stream()
+ .map(partition -> partition.replaceAll("\\\\", "/"))
+ .collect(Collectors.toList());
+ try {
+ hiveMetaStore.dropPartitions(dbName, tableName, partitions);
+ log.info("Remove these partitions [{}]", partitions);
+ } catch (TException e) {
+ log.error("Failed to remove these partitions [{}]", partitions);
+ }
+ }
+ hiveMetaStore.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index b28d56b2a..8cf118bbc 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -42,6 +42,7 @@ public class HiveConfig {
}
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
+ hiveMetaStoreProxy.close();
return Pair.of(splits, tableInformation);
}
}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 0fe82358e..34437ca42 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -21,25 +21,29 @@ import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIE
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PARTITION_BY;
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SINK_COLUMNS;
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
@@ -50,10 +54,12 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
@AutoService(SeaTunnelSink.class)
@@ -67,29 +73,6 @@ public class HiveSink extends BaseHdfsFileSink {
return "Hive";
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setTypeInfo(seaTunnelRowType);
- HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(pluginConfig);
- // --------------------Check textFileSinkConfig with the hive table info-------------------
- List<FieldSchema> fields = hiveMetaStoreProxy.getTableFields(dbName, tableName);
- List<FieldSchema> partitionKeys = tableInformation.getPartitionKeys();
-
- // Remove partitionKeys from table fields
- List<FieldSchema> fieldNotContainPartitionKey = fields.stream().filter(filed -> !partitionKeys.contains(filed)).collect(Collectors.toList());
-
- // check fields size must same as sinkColumnList size
- if (fieldNotContainPartitionKey.size() != textFileSinkConfig.getSinkColumnList().size()) {
- throw new RuntimeException("sink columns size must same as hive table field size");
- }
-
- // check hivePartitionFieldList size must same as partitionFieldList size
- if (partitionKeys.size() != textFileSinkConfig.getPartitionFieldList().size()) {
- throw new RuntimeException("partition by columns size must same as hive table partition columns size");
- }
- hiveMetaStoreProxy.close();
- }
-
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
@@ -100,6 +83,13 @@ public class HiveSink extends BaseHdfsFileSink {
dbName = tableInfo.getLeft()[0];
tableName = tableInfo.getLeft()[1];
tableInformation = tableInfo.getRight();
+ List<String> sinkFields = tableInformation.getSd().getCols().stream()
+ .map(FieldSchema::getName)
+ .collect(Collectors.toList());
+ List<String> partitionKeys = tableInformation.getPartitionKeys().stream()
+ .map(FieldSchema::getName)
+ .collect(Collectors.toList());
+ sinkFields.addAll(partitionKeys);
String outputFormat = tableInformation.getSd().getOutputFormat();
if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
@@ -113,10 +103,12 @@ public class HiveSink extends BaseHdfsFileSink {
} else {
throw new RuntimeException("Only support [text parquet orc] file now");
}
- pluginConfig = pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
- .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
- .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
-
+ pluginConfig = pluginConfig
+ .withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
+ .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
+ .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()))
+ .withValue(SINK_COLUMNS, ConfigValueFactory.fromAnyRef(sinkFields))
+ .withValue(PARTITION_BY, ConfigValueFactory.fromAnyRef(partitionKeys));
if (!pluginConfig.hasPath(SAVE_MODE) || StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
pluginConfig = pluginConfig.withValue(SAVE_MODE, ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
}
@@ -131,4 +123,9 @@ public class HiveSink extends BaseHdfsFileSink {
}
this.pluginConfig = pluginConfig;
}
+
+ @Override
+ public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
+ return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName));
+ }
}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index d813a03a3..60ec238ad 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.NonNull;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
@@ -67,12 +66,19 @@ public class HiveMetaStoreProxy {
}
}
- public List<FieldSchema> getTableFields(@NonNull String dbName, @NonNull String tableName) {
- try {
- return hiveMetaStoreClient.getFields(dbName, tableName);
- } catch (TException e) {
- String errorMsg = String.format("Get table [%s.%s] fields information failed", dbName, tableName);
- throw new RuntimeException(errorMsg, e);
+ public void addPartitions(@NonNull String dbName,
+ @NonNull String tableName,
+ List<String> partitions) throws TException {
+ for (String partition : partitions) {
+ hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
+ }
+ }
+
+ public void dropPartitions(@NonNull String dbName,
+ @NonNull String tableName,
+ List<String> partitions) throws TException {
+ for (String partition : partitions) {
+ hiveMetaStoreClient.dropPartition(dbName, tableName, partition, false);
}
}