You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/20 09:44:33 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a8738ef3c [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
a8738ef3c is described below

commit a8738ef3c4f6686869d75b46aa6590d2e2503e33
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Oct 20 17:44:27 2022 +0800

    [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
    
    [Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)
---
 docs/en/connector-v2/sink/Hive.md                  |  7 ++
 .../hive/commit/HiveSinkAggregatedCommitter.java   | 87 ++++++++++++++++++++++
 .../seatunnel/hive/config/HiveConfig.java          |  1 +
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 55 +++++++-------
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   | 20 +++--
 5 files changed, 134 insertions(+), 36 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md
index 19b287396..9fee56bc5 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -159,3 +159,10 @@ sink {
   }
 }
 ```
+
+## Changelog
+
+| Version    | Date       | Pull Request                                                    | Subject                                       |
+|------------|------------|-----------------------------------------------------------------|-----------------------------------------------|
+| 2.2.0-beta | 2022-09-26 |                                                                 | Add Hive Sink                                 |
+| 2.3.0-beta | 2022-10-19 | [3133](https://github.com/apache/incubator-seatunnel/pull/3133) | Hive Sink supports automatic partition repair |
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
new file mode 100644
index 000000000..410d9ec8c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.commit;
+
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
+    private final Config pluginConfig;
+    private final String dbName;
+    private final String tableName;
+
+    public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, String tableName) {
+        this.pluginConfig = pluginConfig;
+        this.dbName = dbName;
+        this.tableName = tableName;
+    }
+
+    @Override
+    public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
+        HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
+        List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
+        if (errorCommitInfos.isEmpty()) {
+            for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
+                Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
+                List<String> partitions = partitionDirAndValuesMap.keySet().stream()
+                        .map(partition -> partition.replaceAll("\\\\", "/"))
+                        .collect(Collectors.toList());
+                try {
+                    hiveMetaStore.addPartitions(dbName, tableName, partitions);
+                    log.info("Add these partitions [{}]", partitions);
+                } catch (TException e) {
+                    log.error("Failed to add these partitions [{}]", partitions);
+                    errorCommitInfos.add(aggregatedCommitInfo);
+                }
+            }
+        }
+        hiveMetaStore.close();
+        return errorCommitInfos;
+    }
+
+    @Override
+    public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
+        super.abort(aggregatedCommitInfos);
+        HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
+        for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
+            Map<String, List<String>> partitionDirAndValuesMap = aggregatedCommitInfo.getPartitionDirAndValuesMap();
+            List<String> partitions = partitionDirAndValuesMap.keySet().stream()
+                    .map(partition -> partition.replaceAll("\\\\", "/"))
+                    .collect(Collectors.toList());
+            try {
+                hiveMetaStore.dropPartitions(dbName, tableName, partitions);
+                log.info("Remove these partitions [{}]", partitions);
+            } catch (TException e) {
+                log.error("Failed to remove these partitions [{}]", partitions);
+            }
+        }
+        hiveMetaStore.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index b28d56b2a..8cf118bbc 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -42,6 +42,7 @@ public class HiveConfig {
         }
         HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(config);
         Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], splits[1]);
+        hiveMetaStoreProxy.close();
         return Pair.of(splits, tableInformation);
     }
 }
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 0fe82358e..34437ca42 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -21,25 +21,29 @@ import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIE
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PARTITION_BY;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SINK_COLUMNS;
 import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
 import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
 import static org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
@@ -50,10 +54,12 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSink.class)
@@ -67,29 +73,6 @@ public class HiveSink extends BaseHdfsFileSink {
         return "Hive";
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setTypeInfo(seaTunnelRowType);
-        HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(pluginConfig);
-        // --------------------Check textFileSinkConfig with the hive table info-------------------
-        List<FieldSchema> fields = hiveMetaStoreProxy.getTableFields(dbName, tableName);
-        List<FieldSchema> partitionKeys = tableInformation.getPartitionKeys();
-
-        // Remove partitionKeys from table fields
-        List<FieldSchema> fieldNotContainPartitionKey = fields.stream().filter(filed -> !partitionKeys.contains(filed)).collect(Collectors.toList());
-
-        // check fields size must same as sinkColumnList size
-        if (fieldNotContainPartitionKey.size() != textFileSinkConfig.getSinkColumnList().size()) {
-            throw new RuntimeException("sink columns size must same as hive table field size");
-        }
-
-        // check hivePartitionFieldList size must same as partitionFieldList size
-        if (partitionKeys.size() != textFileSinkConfig.getPartitionFieldList().size()) {
-            throw new RuntimeException("partition by columns size must same as hive table partition columns size");
-        }
-        hiveMetaStoreProxy.close();
-    }
-
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
@@ -100,6 +83,13 @@ public class HiveSink extends BaseHdfsFileSink {
         dbName = tableInfo.getLeft()[0];
         tableName = tableInfo.getLeft()[1];
         tableInformation = tableInfo.getRight();
+        List<String> sinkFields = tableInformation.getSd().getCols().stream()
+                .map(FieldSchema::getName)
+                .collect(Collectors.toList());
+        List<String> partitionKeys = tableInformation.getPartitionKeys().stream()
+                .map(FieldSchema::getName)
+                .collect(Collectors.toList());
+        sinkFields.addAll(partitionKeys);
         String outputFormat = tableInformation.getSd().getOutputFormat();
         if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
@@ -113,10 +103,12 @@ public class HiveSink extends BaseHdfsFileSink {
         } else {
             throw new RuntimeException("Only support [text parquet orc] file now");
         }
-        pluginConfig = pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
-            .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
-            .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
-
+        pluginConfig = pluginConfig
+                .withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, ConfigValueFactory.fromAnyRef(false))
+                .withValue(FILE_NAME_EXPRESSION, ConfigValueFactory.fromAnyRef("${transactionId}"))
+                .withValue(PATH, ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()))
+                .withValue(SINK_COLUMNS, ConfigValueFactory.fromAnyRef(sinkFields))
+                .withValue(PARTITION_BY, ConfigValueFactory.fromAnyRef(partitionKeys));
         if (!pluginConfig.hasPath(SAVE_MODE) || StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
             pluginConfig = pluginConfig.withValue(SAVE_MODE, ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
         }
@@ -131,4 +123,9 @@ public class HiveSink extends BaseHdfsFileSink {
         }
         this.pluginConfig = pluginConfig;
     }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
+        return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, dbName, tableName));
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index d813a03a3..60ec238ad 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import lombok.NonNull;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TException;
@@ -67,12 +66,19 @@ public class HiveMetaStoreProxy {
         }
     }
 
-    public List<FieldSchema> getTableFields(@NonNull String dbName, @NonNull String tableName) {
-        try {
-            return hiveMetaStoreClient.getFields(dbName, tableName);
-        } catch (TException e) {
-            String errorMsg = String.format("Get table [%s.%s] fields information failed", dbName, tableName);
-            throw new RuntimeException(errorMsg, e);
+    public void addPartitions(@NonNull String dbName,
+                              @NonNull String tableName,
+                              List<String> partitions) throws TException {
+        for (String partition : partitions) {
+            hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
+        }
+    }
+
+    public void dropPartitions(@NonNull String dbName,
+                               @NonNull String tableName,
+                               List<String> partitions) throws TException {
+        for (String partition : partitions) {
+            hiveMetaStoreClient.dropPartition(dbName, tableName, partition, false);
         }
     }