You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/29 04:25:11 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4665: [HUDI-2733] Add support for Thrift sync

nsivabalan commented on a change in pull request #4665:
URL: https://github.com/apache/hudi/pull/4665#discussion_r794996423



##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/ThriftDDLExecutor.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.StorageSchemes;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.thrift.HMSClient;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+import org.apache.thrift.TException;
+
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ThriftDDLExecutor implements DDLExecutor {
+
+  private static final Logger LOG = LogManager.getLogger(ThriftDDLExecutor.class);
+  private final HMSClient client;

Review comment:
       Can we name this HMSThriftClient. We already have hms mode and calling this HMSClient and not being used with "hms" mode doesn't sit well. 

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/ThriftDDLExecutor.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.StorageSchemes;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.thrift.HMSClient;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+import org.apache.thrift.TException;
+
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ThriftDDLExecutor implements DDLExecutor {
+
+  private static final Logger LOG = LogManager.getLogger(ThriftDDLExecutor.class);
+  private final HMSClient client;
+  private final HiveSyncConfig syncConfig;
+  private final FileSystem fs;
+  private final PartitionValueExtractor partitionValueExtractor;
+
+  public ThriftDDLExecutor(HiveSyncConfig cfg, FileSystem fs) throws URISyntaxException, TException, LoginException, IOException, InterruptedException {
+    this.client = new HMSClient(new URI(cfg.thriftUrl));
+    this.syncConfig = cfg;
+    this.fs = fs;
+    try {
+      this.partitionValueExtractor =
+          (PartitionValueExtractor) Class.forName(syncConfig.partitionValueExtractorClass).newInstance();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException(
+          "Failed to initialize PartitionValueExtractor class " + syncConfig.partitionValueExtractorClass, e);
+    }
+  }
+
+  @Override
+  public void createDatabase(String databaseName) {
+    try {
+      Database database = new Database(databaseName, "automatically created by hoodie", null, null);
+      client.createDatabase(database);
+    } catch (Exception e) {
+      LOG.error("Failed to create database " + databaseName, e);
+      throw new HoodieHiveSyncException("Failed to create database " + databaseName, e);
+    }
+  }
+
+  @Override
+  public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
+                          Map<String, String> tableProperties) {
+    try {
+      LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, false, false);
+
+      List<FieldSchema> fieldSchema = mapSchema.keySet().stream()
+          .map(key -> new FieldSchema(key, mapSchema.get(key).toLowerCase(), ""))
+          .filter(field -> !syncConfig.partitionFields.contains(field.getName()))
+          .collect(Collectors.toList());
+
+      List<FieldSchema> partitionSchema = syncConfig.partitionFields.stream().map(partitionKey -> {
+        String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey);
+        return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), "");
+      }).collect(Collectors.toList());
+
+      Table newTb = new Table();
+      newTb.setDbName(syncConfig.databaseName);
+      newTb.setTableName(tableName);
+      newTb.setCreateTime((int) System.currentTimeMillis());
+      StorageDescriptor storageDescriptor = new StorageDescriptor();
+      storageDescriptor.setCols(fieldSchema);
+      storageDescriptor.setInputFormat(inputFormatClass);
+      storageDescriptor.setOutputFormat(outputFormatClass);
+      storageDescriptor.setLocation(syncConfig.basePath);
+      serdeProperties.put("serialization.format", "1");
+      storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));
+      newTb.setSd(storageDescriptor);
+      newTb.setPartitionKeys(partitionSchema);
+
+      if (!syncConfig.createManagedTable) {
+        newTb.putToParameters("EXTERNAL", "TRUE");
+      }
+
+      for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
+        newTb.putToParameters(entry.getKey(), entry.getValue());
+      }
+      newTb.setTableType(TableType.EXTERNAL_TABLE.toString());
+      client.createTable(newTb);
+    } catch (Exception e) {
+      LOG.error("failed to create table " + tableName, e);
+      throw new HoodieHiveSyncException("failed to create table " + tableName, e);
+    }
+  }
+
+  @Override
+  public void updateTableDefinition(String tableName, MessageType newSchema) {
+    try {
+      boolean cascade = syncConfig.partitionFields.size() > 0;
+      List<FieldSchema> fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig);
+      Table table = client.getTable(syncConfig.databaseName, tableName);
+      StorageDescriptor sd = table.getSd();
+      sd.setCols(fieldSchema);
+      table.setSd(sd);
+      EnvironmentContext environmentContext = new EnvironmentContext();
+      if (cascade) {
+        LOG.info("partition table,need cascade");
+        environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE);
+      }
+      client.alterTable(syncConfig.databaseName, tableName, table);
+    } catch (Exception e) {
+      LOG.error("Failed to update table for " + tableName, e);
+      throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getTableSchema(String tableName) {
+    try {
+      // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
+      // get the Schema of the table.
+      final long start = System.currentTimeMillis();
+      Table table = this.client.getTable(syncConfig.databaseName, tableName);
+      Map<String, String> partitionKeysMap =
+          table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
+
+      Map<String, String> columnsMap =
+          table.getSd().getCols().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
+
+      Map<String, String> schema = new HashMap<>();
+      schema.putAll(columnsMap);
+      schema.putAll(partitionKeysMap);
+      final long end = System.currentTimeMillis();
+      LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
+      return schema;
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get table schema for : " + tableName, e);
+    }
+  }
+
+  @Override
+  public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
+    if (partitionsToAdd.isEmpty()) {
+      LOG.info("No partitions to add for " + tableName);
+      return;
+    }
+    LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
+    try {
+      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
+        StorageDescriptor partitionSd = new StorageDescriptor();
+        partitionSd.setCols(sd.getCols());
+        partitionSd.setInputFormat(sd.getInputFormat());
+        partitionSd.setOutputFormat(sd.getOutputFormat());
+        partitionSd.setSerdeInfo(sd.getSerdeInfo());
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+        partitionSd.setLocation(fullPartitionPath);
+        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, partitionSd, null);
+      }).collect(Collectors.toList());
+      client.addPartitions(partitionList);
+    } catch (TException e) {
+      LOG.error(syncConfig.databaseName + "." + tableName + " add partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " add partition failed", e);
+    }
+  }
+
+  @Override
+  public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
+    if (changedPartitions.isEmpty()) {
+      LOG.info("No partitions to change for " + tableName);
+      return;
+    }
+    LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
+    try {
+      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      List<Partition> partitionList = changedPartitions.stream().map(partition -> {
+        Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
+        String partitionScheme = partitionPath.toUri().getScheme();
+        String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
+            ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
+        List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+        sd.setLocation(fullPartitionPath);
+        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, sd, null);
+      }).collect(Collectors.toList());
+      client.alterPartitions(syncConfig.databaseName, tableName, partitionList);
+    } catch (TException e) {
+      LOG.error(syncConfig.databaseName + "." + tableName + " update partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " update partition failed", e);
+    }
+  }
+
+  @Override
+  public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
+    if (partitionsToDrop.isEmpty()) {
+      LOG.info("No partitions to drop for " + tableName);
+      return;
+    }
+
+    LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
+    try {
+      client.dropPartitions(syncConfig.databaseName, tableName, partitionsToDrop);
+    } catch (TException e) {
+      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+    }
+  }
+
+  @Override
+  public void close() {

Review comment:
       this class for the most part has overlap with HMSDDLExecutor. Can we create a base class and inherit in both executors. you can introduce abstract methods just for the client specific calls. lets try to reuse code as much as possible. 
   

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/thrift/Util.java
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.thrift;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.thrift.TException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hadoop.hive.metastore.TableType.MANAGED_TABLE;
+
+/**
+ * Helper utilities. The Util class is just a placeholder for static methods,
+ * it should be never instantiated.
+ */
+public final class Util {

Review comment:
       may be HiveSyncThriftModeUtils

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/thrift/HMSClient.java
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ *  Wrapper for Thrift HMS interface.
+ */
+public final class HMSClient implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(HMSClient.class);
+  private static final String METASTORE_URI = "hive.metastore.uris";
+  private static final String CONFIG_DIR = "/etc/hive/conf";
+  private static final String HIVE_SITE = "hive-site.xml";
+  private static final String CORE_SITE = "core-site.xml";
+  private static final String PRINCIPAL_KEY = "hive.metastore.kerberos.principal";
+  private static final long SOCKET_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(600);
+
+  private final String confDir;
+  private ThriftHiveMetastore.Iface client;
+  private TTransport transport;
+  private URI serverURI;
+
+  public URI getServerURI() {
+    return serverURI;
+  }
+
+  @Override
+  public String toString() {
+    return serverURI.toString();
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri Hive Metastore URI.
+   */
+  public HMSClient(@Nullable URI uri)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this(uri, CONFIG_DIR);
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri     Hive Metastore URI.
+   * @param confDir Directory with Hive configuration files - normally /etc/hive/conf
+   */
+  public HMSClient(@Nullable URI uri, @Nullable String confDir)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this.confDir = (confDir == null ? CONFIG_DIR : confDir);
+    getClient(uri);
+  }
+
+  /**
+   * Return a new connected client using the same configuration as the original client.
+   *
+   * @return New connected client instance
+   * @throws InterruptedException
+   * @throws URISyntaxException
+   * @throws TException
+   * @throws LoginException
+   * @throws IOException
+   */
+  public HMSClient doClone()
+      throws InterruptedException, URISyntaxException, TException, LoginException, IOException {
+    return new HMSClient(serverURI, confDir);
+  }
+
+  private void addResource(Configuration conf, @NotNull String r) throws MalformedURLException {
+    File f = new File(confDir + "/" + r);
+    if (f.exists() && !f.isDirectory()) {
+      LOG.debug("Adding configuration resource {}", r);
+      conf.addResource(f.toURI().toURL());
+    } else {
+      LOG.debug("Configuration {} does not exist", r);
+    }
+  }
+
+  /**
+   * Create a client to Hive Metastore.
+   * If principal is specified, create kerberised client.
+   *
+   * @param uri server uri
+   * @throws MetaException        if fails to login using kerberos credentials
+   * @throws IOException          if fails connecting to metastore
+   * @throws InterruptedException if interrupted during kerberos setup
+   */
+  private void getClient(@Nullable URI uri)
+      throws TException, IOException, InterruptedException, URISyntaxException, LoginException {
+    HiveConf conf = new HiveConf();
+    addResource(conf, HIVE_SITE);
+    if (uri != null) {
+      conf.set(METASTORE_URI, uri.toString());
+    }
+
+    // Pick up the first URI from the list of available URIs
+    serverURI = uri != null
+        ? uri :
+        new URI(conf.get(METASTORE_URI).split(",")[0]);
+
+    String principal = conf.get(PRINCIPAL_KEY);
+    // TODO fixme
+    // String principal = null;
+
+    if (principal == null) {
+      open(conf, serverURI);
+      return;
+    }
+
+    LOG.debug("Opening kerberos connection to HMS");
+    addResource(conf, CORE_SITE);
+
+    Configuration hadoopConf = new Configuration();
+    addResource(hadoopConf, HIVE_SITE);
+    addResource(hadoopConf, CORE_SITE);
+
+    // Kerberos magic
+    UserGroupInformation.setConfiguration(hadoopConf);
+    UserGroupInformation.getLoginUser()
+        .doAs((PrivilegedExceptionAction<TTransport>)
+            () -> open(conf, serverURI));
+  }
+
+  /**
+   * Return True if given Hive database excists
+   *
+   * @param dbName database name
+   * @return True iff dbName exists
+   * @throws TException if there is Thrift error
+   */
+  public boolean dbExists(@NotNull String dbName) throws TException {
+    return getAllDatabases(dbName).contains(dbName);
+  }
+
+  /**
+   * Return True if specified table exists
+   *
+   * @param dbName    Database name
+   * @param tableName Table name
+   * @return True iff table exists
+   * @throws TException if there is Thrift error
+   */
+  public boolean tableExists(@NotNull String dbName, @NotNull String tableName) throws TException {
+    return getAllTables(dbName, tableName).contains(tableName);
+  }
+
+  /**
+   * Get full Database information.
+   *
+   * @param dbName Database name.
+   * @return
+   * @throws TException if there is Thrift error
+   */
+  public Database getDatabase(@NotNull String dbName) throws TException {
+    return client.get_database(dbName);
+  }
+
+  /**
+   * Return all databases with name matching the filter.
+   *
+   * @param filter Regexp. Can be null or empty in which case everything matches
+   * @return list of database names matching the filter
+   * @throws TException when the call fails
+   */
+  public Set<String> getAllDatabases(@Nullable String filter) throws TException {
+    if (filter == null || filter.isEmpty()) {
+      return new HashSet<>(client.get_all_databases());
+    }
+    return client.get_all_databases()
+        .stream()
+        .filter(n -> n.matches(filter))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Get all table names for a database matching the filter.
+   * If filter is null or emoty, return all table names. This call does not use
+   * native HMS table name matching, it uses client-side regexp matching instead.
+   *
+   * @param dbName Database name
+   * @param filter regexp matching table name
+   * @return Set of matching table names or set of all table names
+   * @throws TException if there is Thrift error
+   */
+  public Set<String> getAllTables(@NotNull String dbName, @Nullable String filter) throws TException {
+    if (filter == null || filter.isEmpty()) {
+      return new HashSet<>(client.get_all_tables(dbName));
+    }
+    return client.get_all_tables(dbName)
+        .stream()
+        .filter(n -> n.matches(filter))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Get list of tables within a database
+   *
+   * @param dbName     Database name
+   * @param tableNames All tables of interest
+   */
+  public List<Table> getTableObjects(@NotNull String dbName, @NotNull List<String> tableNames)
+      throws TException {
+    return client.get_table_objects_by_name(dbName, tableNames);
+  }
+
+  /**
+   * Create database with the given name if it doesn't exist
+   *
+   * @param name database name
+   * @return True if successfull, false otherwise
+   * @throws TException if the call fails
+   */
+  public boolean createDatabase(@NotNull String name) throws TException {
+    return createDatabase(name, null, null, null);
+  }
+
+  /**
+   * Create database if it doesn't exist
+   *
+   * @param name        Database name
+   * @param description Database description
+   * @param location    Database location
+   * @param params      Database params
+   * @return True if successfull, false otherwise
+   * @throws TException if call fails
+   */
+  public boolean createDatabase(@NotNull String name,
+                                @Nullable String description,
+                                @Nullable String location,
+                                @Nullable Map<String, String> params)
+      throws TException {
+    Database db = new Database(name, description, location, params);
+    client.create_database(db);
+    return true;
+  }
+
+  public boolean createDatabase(Database db) throws TException {
+    client.create_database(db);
+    return true;
+  }
+
+  public boolean dropDatabase(@NotNull String dbName) throws TException {
+    client.drop_database(dbName, true, true);
+    return true;
+  }
+
+  public boolean createTable(Table table) throws TException {
+    client.create_table(table);
+    return true;
+  }
+
+  public boolean dropTable(@NotNull String dbName, @NotNull String tableName) throws TException {
+    client.drop_table(dbName, tableName, true);
+    return true;
+  }
+
+  public Table getTable(@NotNull String dbName, @NotNull String tableName) throws TException {
+    return client.get_table(dbName, tableName);
+  }
+
+  public Partition createPartition(@NotNull Table table, @NotNull List<String> values) throws TException {
+    return client.add_partition(new Util.PartitionBuilder(table).withValues(values).build());
+  }
+
+  public Partition addPartition(@NotNull Partition partition) throws TException {
+    return client.add_partition(partition);
+  }
+
+  public void addPartitions(List<Partition> partitions) throws TException {
+    client.add_partitions(partitions);
+  }
+
+  public List<Partition> listPartitions(@NotNull String dbName, @NotNull String tableName) throws TException {
+    return client.get_partitions(dbName, tableName, (short) -1);
+  }
+
+  public Long getCurrentNotificationId() throws TException {
+    return client.get_current_notificationEventId().getEventId();
+  }
+
+  public List<String> getPartitionNames(@NotNull String dbName,
+                                        @NotNull String tableName) throws TException {
+    return client.get_partition_names(dbName, tableName, (short) -1);
+  }
+
+  public boolean dropPartition(@NotNull String dbName, @NotNull String tableName,
+                               @NotNull List<String> arguments)
+      throws TException {
+    return client.drop_partition(dbName, tableName, arguments, true);
+  }
+
+  public List<Partition> getPartitions(@NotNull String dbName, @NotNull String tableName) throws TException {
+    return client.get_partitions(dbName, tableName, (short) -1);
+  }
+
+  public DropPartitionsResult dropPartitions(@NotNull String dbName, @NotNull String tableName,
+                                             @Nullable List<String> partNames) throws TException {
+    if (partNames == null) {
+      return dropPartitions(dbName, tableName, getPartitionNames(dbName, tableName));
+    }
+    if (partNames.isEmpty()) {
+      return null;
+    }
+    return client.drop_partitions_req(new DropPartitionsRequest(dbName,
+        tableName, RequestPartsSpec.names(partNames)));
+  }
+
+  public List<Partition> getPartitionsByNames(@NotNull String dbName, @NotNull String tableName,
+                                              @Nullable List<String> names) throws TException {
+    if (names == null) {
+      return client.get_partitions_by_names(dbName, tableName,
+          getPartitionNames(dbName, tableName));
+    }
+    return client.get_partitions_by_names(dbName, tableName, names);
+  }
+
+  public boolean alterTable(@NotNull String dbName, @NotNull String tableName, @NotNull Table newTable)
+      throws TException {
+    client.alter_table(dbName, tableName, newTable);
+    return true;
+  }
+
+  public void alterPartition(@NotNull String dbName, @NotNull String tableName,
+                             @NotNull Partition partition) throws TException {
+    client.alter_partition(dbName, tableName, partition);
+  }
+
+  public void alterPartitions(@NotNull String dbName, @NotNull String tableName,
+                              @NotNull List<Partition> partitions) throws TException {
+    client.alter_partitions(dbName, tableName, partitions);
+  }
+
+  public void appendPartition(@NotNull String dbName, @NotNull String tableName,
+                              @NotNull List<String> partitionValues) throws TException {
+    client.append_partition_with_environment_context(dbName, tableName, partitionValues, null);
+  }
+
+  private TTransport open(HiveConf conf, @NotNull URI uri) throws
+      TException, IOException, LoginException {
+    boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
+    boolean useFramedTransport = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
+    boolean useCompactProtocol = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
+    LOG.debug("Connecting to {}, framedTransport = {}", uri, useFramedTransport);
+
+    transport = new TSocket(uri.getHost(), uri.getPort(), (int) SOCKET_TIMEOUT_MS);
+
+    if (useSasl) {
+      LOG.debug("Using SASL authentication");
+      HadoopThriftAuthBridge.Client authBridge =

Review comment:
       I don't have good knowledge on these. But I assume you have tested this and hence gonna skim through. 

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/thrift/HMSClient.java
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ *  Wrapper for Thrift HMS interface.
+ */
+public final class HMSClient implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(HMSClient.class);
+  private static final String METASTORE_URI = "hive.metastore.uris";
+  private static final String CONFIG_DIR = "/etc/hive/conf";
+  private static final String HIVE_SITE = "hive-site.xml";
+  private static final String CORE_SITE = "core-site.xml";
+  private static final String PRINCIPAL_KEY = "hive.metastore.kerberos.principal";
+  private static final long SOCKET_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(600);
+
+  private final String confDir;
+  private ThriftHiveMetastore.Iface client;
+  private TTransport transport;
+  private URI serverURI;
+
+  public URI getServerURI() {
+    return serverURI;
+  }
+
+  @Override
+  public String toString() {
+    return serverURI.toString();
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri Hive Metastore URI.
+   */
+  public HMSClient(@Nullable URI uri)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this(uri, CONFIG_DIR);
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri     Hive Metastore URI.
+   * @param confDir Directory with Hive configuration files - normally /etc/hive/conf
+   */
+  public HMSClient(@Nullable URI uri, @Nullable String confDir)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this.confDir = (confDir == null ? CONFIG_DIR : confDir);
+    getClient(uri);
+  }
+
+  /**
+   * Return a new connected client using the same configuration as the original client.
+   *
+   * @return New connected client instance
+   * @throws InterruptedException
+   * @throws URISyntaxException
+   * @throws TException
+   * @throws LoginException
+   * @throws IOException
+   */
+  public HMSClient doClone()
+      throws InterruptedException, URISyntaxException, TException, LoginException, IOException {
+    return new HMSClient(serverURI, confDir);
+  }
+
+  private void addResource(Configuration conf, @NotNull String r) throws MalformedURLException {
+    File f = new File(confDir + "/" + r);
+    if (f.exists() && !f.isDirectory()) {
+      LOG.debug("Adding configuration resource {}", r);
+      conf.addResource(f.toURI().toURL());
+    } else {
+      LOG.debug("Configuration {} does not exist", r);
+    }
+  }
+
+  /**
+   * Create a client to Hive Metastore.
+   * If principal is specified, create kerberised client.
+   *
+   * @param uri server uri
+   * @throws MetaException        if fails to login using kerberos credentials
+   * @throws IOException          if fails connecting to metastore
+   * @throws InterruptedException if interrupted during kerberos setup
+   */
+  private void getClient(@Nullable URI uri)
+      throws TException, IOException, InterruptedException, URISyntaxException, LoginException {
+    HiveConf conf = new HiveConf();
+    addResource(conf, HIVE_SITE);
+    if (uri != null) {
+      conf.set(METASTORE_URI, uri.toString());
+    }
+
+    // Pick up the first URI from the list of available URIs
+    serverURI = uri != null
+        ? uri :
+        new URI(conf.get(METASTORE_URI).split(",")[0]);
+
+    String principal = conf.get(PRINCIPAL_KEY);
+    // TODO fixme
+    // String principal = null;

Review comment:
       can we remove any residues from draft work. 

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/thrift/HMSClient.java
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ *  Wrapper for Thrift HMS interface.
+ */
+public final class HMSClient implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(HMSClient.class);
+  private static final String METASTORE_URI = "hive.metastore.uris";
+  private static final String CONFIG_DIR = "/etc/hive/conf";
+  private static final String HIVE_SITE = "hive-site.xml";
+  private static final String CORE_SITE = "core-site.xml";
+  private static final String PRINCIPAL_KEY = "hive.metastore.kerberos.principal";
+  private static final long SOCKET_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(600);
+
+  private final String confDir;
+  private ThriftHiveMetastore.Iface client;
+  private TTransport transport;
+  private URI serverURI;
+
+  public URI getServerURI() {
+    return serverURI;
+  }
+
+  @Override
+  public String toString() {
+    return serverURI.toString();
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri Hive Metastore URI.
+   */
+  public HMSClient(@Nullable URI uri)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this(uri, CONFIG_DIR);
+  }
+
+  /**
+   * Connect to HMS and return a client instance.
+   *
+   * @param uri     Hive Metastore URI.
+   * @param confDir Directory with Hive configuration files - normally /etc/hive/conf
+   */
+  public HMSClient(@Nullable URI uri, @Nullable String confDir)
+      throws TException, IOException, InterruptedException, LoginException, URISyntaxException {
+    this.confDir = (confDir == null ? CONFIG_DIR : confDir);
+    getClient(uri);
+  }
+
+  /**
+   * Return a new connected client using the same configuration as the original client.
+   *
+   * @return New connected client instance
+   * @throws InterruptedException
+   * @throws URISyntaxException
+   * @throws TException
+   * @throws LoginException
+   * @throws IOException
+   */
+  public HMSClient doClone()
+      throws InterruptedException, URISyntaxException, TException, LoginException, IOException {
+    return new HMSClient(serverURI, confDir);
+  }
+
+  private void addResource(Configuration conf, @NotNull String r) throws MalformedURLException {
+    File f = new File(confDir + "/" + r);
+    if (f.exists() && !f.isDirectory()) {
+      LOG.debug("Adding configuration resource {}", r);
+      conf.addResource(f.toURI().toURL());
+    } else {
+      LOG.debug("Configuration {} does not exist", r);
+    }
+  }
+
+  /**
+   * Create a client to Hive Metastore.
+   * If principal is specified, create kerberised client.
+   *
+   * @param uri server uri
+   * @throws MetaException        if fails to login using kerberos credentials
+   * @throws IOException          if fails connecting to metastore
+   * @throws InterruptedException if interrupted during kerberos setup
+   */
+  private void getClient(@Nullable URI uri)

Review comment:
       can we fix method name. having "get" in the name, but method does not return anything. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org