You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/07 07:55:57 UTC

[flink-table-store] branch master updated: [FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c6ddd6e [FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store
7c6ddd6e is described below

commit 7c6ddd6ec19a1a232450a2f604fd6df8d1144f82
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jul 7 15:55:53 2022 +0800

    [FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store
    
    This closes #201
---
 .../generated/catalog_configuration.html           |  18 +++
 .../flink/table/store/connector/FlinkCatalog.java  |   8 +-
 .../table/store/connector/FlinkCatalogFactory.java |  10 +-
 .../connector/TableStoreConnectorFactory.java      |  21 +++
 .../store/connector/sink/FlinkSinkBuilder.java     |   1 +
 .../table/store/connector/sink/StoreCommitter.java |   1 +
 .../table/store/connector/sink/StoreSink.java      |   1 +
 .../table/store/connector/sink/TableStoreSink.java |   1 +
 .../apache/flink/table/store/CatalogOptions.java   |  22 +++
 .../flink/table/store/file/catalog/Catalog.java    |   7 +
 .../table/store/file/catalog/CatalogFactory.java   |   6 +-
 .../table/store/file/catalog}/CatalogLock.java     |   2 +-
 .../store/file/catalog/FileSystemCatalog.java      |   6 +
 .../file/catalog/FileSystemCatalogFactory.java     |   4 +-
 .../apache/flink/table/store/hive/HiveCatalog.java |  49 ++++---
 .../flink/table/store/hive/HiveCatalogFactory.java |  13 +-
 .../flink/table/store/hive/HiveCatalogLock.java    | 150 +++++++++++++++++++++
 .../table/store/hive/SerializableHiveConf.java     | 101 ++++++++++++++
 .../flink/table/store/hive/HiveCatalogITCase.java  |  71 +++++++++-
 19 files changed, 453 insertions(+), 39 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index ba90de08..b3056129 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -8,6 +8,24 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>lock-acquire-timeout</h5></td>
+            <td style="word-wrap: break-word;">8 min</td>
+            <td>Duration</td>
+            <td>The maximum time to wait for acquiring the lock.</td>
+        </tr>
+        <tr>
+            <td><h5>lock-check-max-sleep</h5></td>
+            <td style="word-wrap: break-word;">8 s</td>
+            <td>Duration</td>
+            <td>The maximum sleep time when retrying to check the lock.</td>
+        </tr>
+        <tr>
+            <td><h5>lock.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable Catalog Lock.</td>
+        </tr>
         <tr>
             <td><h5>metastore</h5></td>
             <td style="word-wrap: break-word;">"filesystem"</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 11d79dd4..a3e4bfdd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -39,9 +39,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.Factory;
-import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -53,7 +51,6 @@ import java.util.Optional;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
 
 /** Catalog for table store. */
 public class FlinkCatalog extends AbstractCatalog {
@@ -70,14 +67,13 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     @VisibleForTesting
-    Catalog catalog() {
+    public Catalog catalog() {
         return catalog;
     }
 
     @Override
     public Optional<Factory> getFactory() {
-        return Optional.of(
-                FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+        return Optional.of(new TableStoreConnectorFactory(catalog.lockFactory().orElse(null)));
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 8ff157da..1811b3e8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -20,8 +20,7 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 
 import java.util.Collections;
@@ -52,13 +51,10 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
 
     @Override
     public FlinkCatalog createCatalog(Context context) {
-        FactoryUtil.CatalogFactoryHelper helper =
-                FactoryUtil.createCatalogFactoryHelper(this, context);
-        ReadableConfig options = helper.getOptions();
-        return createCatalog(context.getName(), options);
+        return createCatalog(context.getName(), Configuration.fromMap(context.getOptions()));
     }
 
-    public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
+    public static FlinkCatalog createCatalog(String catalogName, Configuration options) {
         return new FlinkCatalog(
                 CatalogFactory.createCatalog(options), catalogName, options.get(DEFAULT_DATABASE));
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index e5a5a4ef..145cff7a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -19,14 +19,35 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+
+import javax.annotation.Nullable;
 
 import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
 
 /** A table store {@link DynamicTableFactory} to create source and sink. */
 public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
 
+    @Nullable private final CatalogLock.Factory lockFactory;
+
+    public TableStoreConnectorFactory() {
+        this(null);
+    }
+
+    public TableStoreConnectorFactory(@Nullable CatalogLock.Factory lockFactory) {
+        this.lockFactory = lockFactory;
+    }
+
     @Override
     public String factoryIdentifier() {
         return IDENTIFIER;
     }
+
+    @Override
+    public TableStoreSink createDynamicTableSink(Context context) {
+        TableStoreSink sink = super.createDynamicTableSink(context);
+        sink.setLockFactory(lockFactory);
+        return sink;
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index b0272d95..bac731dc 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index 0d35d4ad..d06e09fd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 0f80e62a..24542bd6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index f2fcb843..b0062a35 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
index 26a63fad..fa8b3b01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
@@ -22,6 +22,10 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
 
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /** Catalog options for table store. */
 public class CatalogOptions {
     private CatalogOptions() {}
@@ -44,4 +48,22 @@ public class CatalogOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Uri of metastore server.");
+
+    public static final ConfigOption<Boolean> LOCK_ENABLED =
+            ConfigOptions.key("lock.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enable Catalog Lock.");
+
+    public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
+            key("lock-check-max-sleep")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(8))
+                    .withDescription("The maximum sleep time when retrying to check the lock.");
+
+    public static final ConfigOption<Duration> LOCK_ACQUIRE_TIMEOUT =
+            key("lock-acquire-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(8))
+                    .withDescription("The maximum time to wait for acquiring the lock.");
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index a7db7694..1642d4ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * This interface is responsible for reading and writing metadata such as database/table from a
@@ -33,6 +34,12 @@ import java.util.List;
  */
 public interface Catalog extends AutoCloseable {
 
+    /**
+     * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
+     * object store.
+     */
+    Optional<CatalogLock.Factory> lockFactory();
+
     /**
      * Get the names of all databases in this catalog.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 608cd266..c140587b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.catalog;
 
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -34,9 +34,9 @@ public interface CatalogFactory {
 
     String identifier();
 
-    Catalog create(String warehouse, ReadableConfig options);
+    Catalog create(String warehouse, Configuration options);
 
-    static Catalog createCatalog(ReadableConfig options) {
+    static Catalog createCatalog(Configuration options) {
         // manual validation
         // because different catalog types may have different options
         // we can't list them all in the optionalOptions() method
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
similarity index 96%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
index 08b5b520..57543607 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.connector.sink;
+package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.annotation.Internal;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6c16366a..23c55131 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.schema.UpdateSchema;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
@@ -43,6 +44,11 @@ public class FileSystemCatalog extends AbstractCatalog {
         this.fs = uncheck(warehouse::getFileSystem);
     }
 
+    @Override
+    public Optional<CatalogLock.Factory> lockFactory() {
+        return Optional.empty();
+    }
+
     @Override
     public List<String> listDatabases() {
         List<String> databases = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
index c705872e..98acb1ae 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.file.catalog;
 
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 
 /** Factory to create {@link FileSystemCatalog}. */
@@ -32,7 +32,7 @@ public class FileSystemCatalogFactory implements CatalogFactory {
     }
 
     @Override
-    public Catalog create(String warehouse, ReadableConfig options) {
+    public Catalog create(String warehouse, Configuration options) {
         return new FileSystemCatalog(new Path(warehouse));
     }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 1ad4327d..f6a49057 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.hive;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.AbstractCatalog;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -47,8 +49,11 @@ import org.apache.thrift.TException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
+
 /** A catalog implementation for Hive. */
 public class HiveCatalog extends AbstractCatalog {
 
@@ -66,22 +71,19 @@ public class HiveCatalog extends AbstractCatalog {
     private final HiveConf hiveConf;
     private final IMetaStoreClient client;
 
-    public HiveCatalog(String thriftUri, String warehousePath) {
-        Configuration conf = new Configuration();
-        conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
-        conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath);
-        this.hiveConf = new HiveConf(conf, HiveConf.class);
-        try {
-            IMetaStoreClient client =
-                    RetryingMetaStoreClient.getProxy(
-                            hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
-            this.client =
-                    StringUtils.isNullOrWhitespaceOnly(thriftUri)
-                            ? client
-                            : HiveMetaStoreClient.newSynchronizedClient(client);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+    public HiveCatalog(Configuration hadoopConfig) {
+        this.hiveConf = new HiveConf(hadoopConfig, HiveConf.class);
+        this.client = createClient(hiveConf);
+    }
+
+    @Override
+    public Optional<CatalogLock.Factory> lockFactory() {
+        boolean lockEnabled =
+                Boolean.parseBoolean(
+                        hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
+        return lockEnabled
+                ? Optional.of(HiveCatalogLock.createFactory(hiveConf))
+                : Optional.empty();
     }
 
     @Override
@@ -353,4 +355,19 @@ public class HiveCatalog extends AbstractCatalog {
                     e);
         }
     }
+
+    static IMetaStoreClient createClient(HiveConf hiveConf) {
+        IMetaStoreClient client;
+        try {
+            client =
+                    RetryingMetaStoreClient.getProxy(
+                            hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+        } catch (MetaException e) {
+            throw new RuntimeException(e);
+        }
+        return StringUtils.isNullOrWhitespaceOnly(
+                        hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+                ? client
+                : HiveMetaStoreClient.newSynchronizedClient(client);
+    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index ec0dc2d1..2488c130 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+
 import static org.apache.flink.table.store.CatalogOptions.URI;
 
 /** Factory to create {@link HiveCatalog}. */
@@ -36,11 +38,16 @@ public class HiveCatalogFactory implements CatalogFactory {
     }
 
     @Override
-    public Catalog create(String warehouse, ReadableConfig options) {
+    public Catalog create(String warehouse, Configuration options) {
         String uri =
                 Preconditions.checkNotNull(
                         options.get(URI),
                         URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
-        return new HiveCatalog(uri, warehouse);
+        org.apache.hadoop.conf.Configuration hadoopConfig =
+                new org.apache.hadoop.conf.Configuration();
+        options.toMap().forEach(hadoopConfig::set);
+        hadoopConfig.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+        hadoopConfig.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
+        return new HiveCatalog(hadoopConfig);
     }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
new file mode 100644
index 00000000..7dd8232c
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.util.TimeUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.thrift.TException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
+import static org.apache.flink.table.store.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
+
+/** Hive {@link CatalogLock}. */
+public class HiveCatalogLock implements CatalogLock {
+
+    private final IMetaStoreClient client;
+    private final long checkMaxSleep;
+    private final long acquireTimeout;
+
+    public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) {
+        this.client = client;
+        this.checkMaxSleep = checkMaxSleep;
+        this.acquireTimeout = acquireTimeout;
+    }
+
+    @Override
+    public <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception {
+        long lockId = lock(database, table);
+        try {
+            return callable.call();
+        } finally {
+            unlock(lockId);
+        }
+    }
+
+    private long lock(String database, String table)
+            throws UnknownHostException, TException, InterruptedException {
+        final LockComponent lockComponent =
+                new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
+        lockComponent.setTablename(table);
+        lockComponent.unsetOperationType();
+        final LockRequest lockRequest =
+                new LockRequest(
+                        Collections.singletonList(lockComponent),
+                        System.getProperty("user.name"),
+                        InetAddress.getLocalHost().getHostName());
+        LockResponse lockResponse = this.client.lock(lockRequest);
+
+        long nextSleep = 50;
+        long startRetry = System.currentTimeMillis();
+        while (lockResponse.getState() == LockState.WAITING) {
+            nextSleep *= 2;
+            if (nextSleep > checkMaxSleep) {
+                nextSleep = checkMaxSleep;
+            }
+            Thread.sleep(nextSleep);
+
+            lockResponse = client.checkLock(lockResponse.getLockid());
+            if (System.currentTimeMillis() - startRetry > acquireTimeout) {
+                break;
+            }
+        }
+        long retryDuration = System.currentTimeMillis() - startRetry;
+
+        if (lockResponse.getState() != LockState.ACQUIRED) {
+            if (lockResponse.getState() == LockState.WAITING) {
+                client.unlock(lockResponse.getLockid());
+            }
+            throw new RuntimeException(
+                    "Acquire lock failed with time: " + Duration.ofMillis(retryDuration));
+        }
+        return lockResponse.getLockid();
+    }
+
+    private void unlock(long lockId) throws TException {
+        client.unlock(lockId);
+    }
+
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
+    /** Create a hive lock factory. */
+    public static CatalogLock.Factory createFactory(HiveConf hiveConf) {
+        return new HiveCatalogLockFactory(hiveConf);
+    }
+
+    private static class HiveCatalogLockFactory implements CatalogLock.Factory {
+
+        private static final long serialVersionUID = 1L;
+
+        private final SerializableHiveConf hiveConf;
+
+        public HiveCatalogLockFactory(HiveConf hiveConf) {
+            this.hiveConf = new SerializableHiveConf(hiveConf);
+        }
+
+        @Override
+        public CatalogLock create() {
+            HiveConf conf = hiveConf.conf();
+            long checkMaxSleep =
+                    TimeUtils.parseDuration(
+                                    conf.get(
+                                            LOCK_CHECK_MAX_SLEEP.key(),
+                                            TimeUtils.getStringInMillis(
+                                                    LOCK_CHECK_MAX_SLEEP.defaultValue())))
+                            .toMillis();
+            long acquireTimeout =
+                    TimeUtils.parseDuration(
+                                    conf.get(
+                                            LOCK_ACQUIRE_TIMEOUT.key(),
+                                            TimeUtils.getStringInMillis(
+                                                    LOCK_ACQUIRE_TIMEOUT.defaultValue())))
+                            .toMillis();
+            return new HiveCatalogLock(
+                    HiveCatalog.createClient(conf), checkMaxSleep, acquireTimeout);
+        }
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
new file mode 100644
index 00000000..056598f1
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** Wrap {@link JobConf} to a serializable class. */
+public class SerializableHiveConf implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient JobConf jobConf;
+
+    public SerializableHiveConf(HiveConf conf) {
+        this.jobConf = createJobConfWithCredentials(conf);
+    }
+
+    public HiveConf conf() {
+        HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
+        // to make sure Hive configuration properties in conf not be overridden
+        hiveConf.addResource(jobConf);
+        return hiveConf;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+
+        // we write the jobConf through a separate serializer to avoid cryptic exceptions when it
+        // corrupts the serialization stream
+        final DataOutputSerializer ser = new DataOutputSerializer(256);
+        jobConf.write(ser);
+        out.writeInt(ser.length());
+        out.write(ser.getSharedBuffer(), 0, ser.length());
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        final byte[] data = new byte[in.readInt()];
+        in.readFully(data);
+        final DataInputDeserializer deser = new DataInputDeserializer(data);
+        this.jobConf = new JobConf();
+        try {
+            jobConf.readFields(deser);
+        } catch (IOException e) {
+            throw new IOException(
+                    "Could not deserialize JobConf, the serialized and de-serialized don't match.",
+                    e);
+        }
+        Credentials currentUserCreds = UserGroupInformation.getCurrentUser().getCredentials();
+        if (currentUserCreds != null) {
+            jobConf.getCredentials().addAll(currentUserCreds);
+        }
+    }
+
+    private static void addCredentialsIntoJobConf(JobConf jobConf) {
+        UserGroupInformation currentUser;
+        try {
+            currentUser = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to determine current user", e);
+        }
+        Credentials currentUserCreds = currentUser.getCredentials();
+        if (currentUserCreds != null) {
+            jobConf.getCredentials().mergeAll(currentUserCreds);
+        }
+    }
+
+    private static JobConf createJobConfWithCredentials(Configuration configuration) {
+        JobConf jobConf = new JobConf(configuration);
+        addCredentialsIntoJobConf(jobConf);
+        return jobConf;
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 1b5654b7..e449ddcb 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -23,12 +23,17 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
 import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveRunnerSetup;
 import com.klarna.hiverunner.annotations.HiveSQL;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,6 +45,13 @@ import org.junit.runner.RunWith;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** IT cases for {@link HiveCatalog}. */
 @RunWith(FlinkEmbeddedHiveRunner.class)
@@ -53,6 +65,20 @@ public class HiveCatalogITCase {
     @HiveSQL(files = {})
     private static HiveShell hiveShell;
 
+    @HiveRunnerSetup
+    private static final HiveRunnerConfig CONFIG =
+            new HiveRunnerConfig() {
+                {
+                    // catalog lock needs txn manager
+                    // hive-3.x requires a proper txn manager to create ACID table
+                    getHiveConfSystemOverride()
+                            .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
+                    getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true");
+                    // tell TxnHandler to prepare txn DB
+                    getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true");
+                }
+            };
+
     @Before
     public void before() throws Exception {
         hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
@@ -70,7 +96,8 @@ public class HiveCatalogITCase {
                                 "  'type' = 'table-store',",
                                 "  'metastore' = 'hive',",
                                 "  'uri' = '',",
-                                "  'warehouse' = '" + path + "'",
+                                "  'warehouse' = '" + path + "',",
+                                "  'lock.enabled' = 'true'",
                                 ")"))
                 .await();
         tEnv.executeSql("USE CATALOG my_hive").await();
@@ -223,6 +250,48 @@ public class HiveCatalogITCase {
         }
     }
 
+    @Test
+    public void testHiveLock() throws InterruptedException {
+        tEnv.executeSql("CREATE TABLE T (a INT)");
+        CatalogLock.Factory lockFactory =
+                ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
+                        .catalog()
+                        .lockFactory()
+                        .get();
+
+        AtomicInteger count = new AtomicInteger(0);
+        List<Thread> threads = new ArrayList<>();
+        Callable<Void> unsafeIncrement =
+                () -> {
+                    int nextCount = count.get() + 1;
+                    Thread.sleep(1);
+                    count.set(nextCount);
+                    return null;
+                };
+        for (int i = 0; i < 10; i++) {
+            Thread thread =
+                    new Thread(
+                            () -> {
+                                CatalogLock lock = lockFactory.create();
+                                for (int j = 0; j < 10; j++) {
+                                    try {
+                                        lock.runWithLock("test_db", "T", unsafeIncrement);
+                                    } catch (Exception e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                }
+                            });
+            thread.start();
+            threads.add(thread);
+        }
+
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        assertThat(count.get()).isEqualTo(100);
+    }
+
     private List<Row> collect(String sql) throws Exception {
         List<Row> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {