You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/07 07:55:57 UTC
[flink-table-store] branch master updated: [FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7c6ddd6e [FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store
7c6ddd6e is described below
commit 7c6ddd6ec19a1a232450a2f604fd6df8d1144f82
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jul 7 15:55:53 2022 +0800
[FLINK-28384][hive] Add CatalogLock support for HiveCatalog in table store
This closes #201
---
.../generated/catalog_configuration.html | 18 +++
.../flink/table/store/connector/FlinkCatalog.java | 8 +-
.../table/store/connector/FlinkCatalogFactory.java | 10 +-
.../connector/TableStoreConnectorFactory.java | 21 +++
.../store/connector/sink/FlinkSinkBuilder.java | 1 +
.../table/store/connector/sink/StoreCommitter.java | 1 +
.../table/store/connector/sink/StoreSink.java | 1 +
.../table/store/connector/sink/TableStoreSink.java | 1 +
.../apache/flink/table/store/CatalogOptions.java | 22 +++
.../flink/table/store/file/catalog/Catalog.java | 7 +
.../table/store/file/catalog/CatalogFactory.java | 6 +-
.../table/store/file/catalog}/CatalogLock.java | 2 +-
.../store/file/catalog/FileSystemCatalog.java | 6 +
.../file/catalog/FileSystemCatalogFactory.java | 4 +-
.../apache/flink/table/store/hive/HiveCatalog.java | 49 ++++---
.../flink/table/store/hive/HiveCatalogFactory.java | 13 +-
.../flink/table/store/hive/HiveCatalogLock.java | 150 +++++++++++++++++++++
.../table/store/hive/SerializableHiveConf.java | 101 ++++++++++++++
.../flink/table/store/hive/HiveCatalogITCase.java | 71 +++++++++-
19 files changed, 453 insertions(+), 39 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index ba90de08..b3056129 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -8,6 +8,24 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>lock-acquire-timeout</h5></td>
+ <td style="word-wrap: break-word;">8 min</td>
+ <td>Duration</td>
+ <td>The maximum time to wait for acquiring the lock.</td>
+ </tr>
+ <tr>
+ <td><h5>lock-check-max-sleep</h5></td>
+ <td style="word-wrap: break-word;">8 s</td>
+ <td>Duration</td>
+ <td>The maximum sleep time when retrying to check the lock.</td>
+ </tr>
+ <tr>
+ <td><h5>lock.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable Catalog Lock.</td>
+ </tr>
<tr>
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 11d79dd4..a3e4bfdd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -39,9 +39,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.Factory;
-import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -53,7 +51,6 @@ import java.util.Optional;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.store.CoreOptions.PATH;
-import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
/** Catalog for table store. */
public class FlinkCatalog extends AbstractCatalog {
@@ -70,14 +67,13 @@ public class FlinkCatalog extends AbstractCatalog {
}
@VisibleForTesting
- Catalog catalog() {
+ public Catalog catalog() {
return catalog;
}
@Override
public Optional<Factory> getFactory() {
- return Optional.of(
- FactoryUtil.discoverFactory(classLoader(), DynamicTableFactory.class, IDENTIFIER));
+ return Optional.of(new TableStoreConnectorFactory(catalog.lockFactory().orElse(null)));
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 8ff157da..1811b3e8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -20,8 +20,7 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import java.util.Collections;
@@ -52,13 +51,10 @@ public class FlinkCatalogFactory implements org.apache.flink.table.factories.Cat
@Override
public FlinkCatalog createCatalog(Context context) {
- FactoryUtil.CatalogFactoryHelper helper =
- FactoryUtil.createCatalogFactoryHelper(this, context);
- ReadableConfig options = helper.getOptions();
- return createCatalog(context.getName(), options);
+ return createCatalog(context.getName(), Configuration.fromMap(context.getOptions()));
}
- public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
+ public static FlinkCatalog createCatalog(String catalogName, Configuration options) {
return new FlinkCatalog(
CatalogFactory.createCatalog(options), catalogName, options.get(DEFAULT_DATABASE));
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index e5a5a4ef..145cff7a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -19,14 +19,35 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+
+import javax.annotation.Nullable;
import static org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
/** A table store {@link DynamicTableFactory} to create source and sink. */
public class TableStoreConnectorFactory extends AbstractTableStoreFactory {
+ @Nullable private final CatalogLock.Factory lockFactory;
+
+ public TableStoreConnectorFactory() {
+ this(null);
+ }
+
+ public TableStoreConnectorFactory(@Nullable CatalogLock.Factory lockFactory) {
+ this.lockFactory = lockFactory;
+ }
+
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
+
+ @Override
+ public TableStoreSink createDynamicTableSink(Context context) {
+ TableStoreSink sink = super.createDynamicTableSink(context);
+ sink.setLockFactory(lockFactory);
+ return sink;
+ }
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index b0272d95..bac731dc 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index 0d35d4ad..d06e09fd 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 0f80e62a..24542bd6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index f2fcb843..b0062a35 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
index 26a63fad..fa8b3b01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java
@@ -22,6 +22,10 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
/** Catalog options for table store. */
public class CatalogOptions {
private CatalogOptions() {}
@@ -44,4 +48,22 @@ public class CatalogOptions {
.stringType()
.noDefaultValue()
.withDescription("Uri of metastore server.");
+
+ public static final ConfigOption<Boolean> LOCK_ENABLED =
+ ConfigOptions.key("lock.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable Catalog Lock.");
+
+ public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
+ key("lock-check-max-sleep")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(8))
+ .withDescription("The maximum sleep time when retrying to check the lock.");
+
+ public static final ConfigOption<Duration> LOCK_ACQUIRE_TIMEOUT =
+ key("lock-acquire-timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(8))
+ .withDescription("The maximum time to wait for acquiring the lock.");
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index a7db7694..1642d4ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import java.util.List;
+import java.util.Optional;
/**
* This interface is responsible for reading and writing metadata such as database/table from a
@@ -33,6 +34,12 @@ import java.util.List;
*/
public interface Catalog extends AutoCloseable {
+ /**
+ * Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
+ * object store.
+ */
+ Optional<CatalogLock.Factory> lockFactory();
+
/**
* Get the names of all databases in this catalog.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 608cd266..c140587b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.catalog;
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -34,9 +34,9 @@ public interface CatalogFactory {
String identifier();
- Catalog create(String warehouse, ReadableConfig options);
+ Catalog create(String warehouse, Configuration options);
- static Catalog createCatalog(ReadableConfig options) {
+ static Catalog createCatalog(Configuration options) {
// manual validation
// because different catalog types may have different options
// we can't list them all in the optionalOptions() method
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
similarity index 96%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
index 08b5b520..57543607 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogLock.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.connector.sink;
+package org.apache.flink.table.store.file.catalog;
import org.apache.flink.annotation.Internal;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6c16366a..23c55131 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.schema.UpdateSchema;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.Callable;
import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
@@ -43,6 +44,11 @@ public class FileSystemCatalog extends AbstractCatalog {
this.fs = uncheck(warehouse::getFileSystem);
}
+ @Override
+ public Optional<CatalogLock.Factory> lockFactory() {
+ return Optional.empty();
+ }
+
@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
index c705872e..98acb1ae 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.store.file.catalog;
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
/** Factory to create {@link FileSystemCatalog}. */
@@ -32,7 +32,7 @@ public class FileSystemCatalogFactory implements CatalogFactory {
}
@Override
- public Catalog create(String warehouse, ReadableConfig options) {
+ public Catalog create(String warehouse, Configuration options) {
return new FileSystemCatalog(new Path(warehouse));
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 1ad4327d..f6a49057 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.hive;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.AbstractCatalog;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -47,8 +49,11 @@ import org.apache.thrift.TException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
+
/** A catalog implementation for Hive. */
public class HiveCatalog extends AbstractCatalog {
@@ -66,22 +71,19 @@ public class HiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private final IMetaStoreClient client;
- public HiveCatalog(String thriftUri, String warehousePath) {
- Configuration conf = new Configuration();
- conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
- conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath);
- this.hiveConf = new HiveConf(conf, HiveConf.class);
- try {
- IMetaStoreClient client =
- RetryingMetaStoreClient.getProxy(
- hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
- this.client =
- StringUtils.isNullOrWhitespaceOnly(thriftUri)
- ? client
- : HiveMetaStoreClient.newSynchronizedClient(client);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ public HiveCatalog(Configuration hadoopConfig) {
+ this.hiveConf = new HiveConf(hadoopConfig, HiveConf.class);
+ this.client = createClient(hiveConf);
+ }
+
+ @Override
+ public Optional<CatalogLock.Factory> lockFactory() {
+ boolean lockEnabled =
+ Boolean.parseBoolean(
+ hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
+ return lockEnabled
+ ? Optional.of(HiveCatalogLock.createFactory(hiveConf))
+ : Optional.empty();
}
@Override
@@ -353,4 +355,19 @@ public class HiveCatalog extends AbstractCatalog {
e);
}
}
+
+ static IMetaStoreClient createClient(HiveConf hiveConf) {
+ IMetaStoreClient client;
+ try {
+ client =
+ RetryingMetaStoreClient.getProxy(
+ hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+ } catch (MetaException e) {
+ throw new RuntimeException(e);
+ }
+ return StringUtils.isNullOrWhitespaceOnly(
+ hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+ ? client
+ : HiveMetaStoreClient.newSynchronizedClient(client);
+ }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
index ec0dc2d1..2488c130 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -18,11 +18,13 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+
import static org.apache.flink.table.store.CatalogOptions.URI;
/** Factory to create {@link HiveCatalog}. */
@@ -36,11 +38,16 @@ public class HiveCatalogFactory implements CatalogFactory {
}
@Override
- public Catalog create(String warehouse, ReadableConfig options) {
+ public Catalog create(String warehouse, Configuration options) {
String uri =
Preconditions.checkNotNull(
options.get(URI),
URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
- return new HiveCatalog(uri, warehouse);
+ org.apache.hadoop.conf.Configuration hadoopConfig =
+ new org.apache.hadoop.conf.Configuration();
+ options.toMap().forEach(hadoopConfig::set);
+ hadoopConfig.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+ hadoopConfig.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse);
+ return new HiveCatalog(hadoopConfig);
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
new file mode 100644
index 00000000..7dd8232c
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.util.TimeUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.thrift.TException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
+import static org.apache.flink.table.store.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
+
+/** Hive {@link CatalogLock}. */
+public class HiveCatalogLock implements CatalogLock {
+
+ private final IMetaStoreClient client;
+ private final long checkMaxSleep;
+ private final long acquireTimeout;
+
+ public HiveCatalogLock(IMetaStoreClient client, long checkMaxSleep, long acquireTimeout) {
+ this.client = client;
+ this.checkMaxSleep = checkMaxSleep;
+ this.acquireTimeout = acquireTimeout;
+ }
+
+ @Override
+ public <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception {
+ long lockId = lock(database, table);
+ try {
+ return callable.call();
+ } finally {
+ unlock(lockId);
+ }
+ }
+
+ private long lock(String database, String table)
+ throws UnknownHostException, TException, InterruptedException {
+ final LockComponent lockComponent =
+ new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
+ lockComponent.setTablename(table);
+ lockComponent.unsetOperationType();
+ final LockRequest lockRequest =
+ new LockRequest(
+ Collections.singletonList(lockComponent),
+ System.getProperty("user.name"),
+ InetAddress.getLocalHost().getHostName());
+ LockResponse lockResponse = this.client.lock(lockRequest);
+
+ long nextSleep = 50;
+ long startRetry = System.currentTimeMillis();
+ while (lockResponse.getState() == LockState.WAITING) {
+ nextSleep *= 2;
+ if (nextSleep > checkMaxSleep) {
+ nextSleep = checkMaxSleep;
+ }
+ Thread.sleep(nextSleep);
+
+ lockResponse = client.checkLock(lockResponse.getLockid());
+ if (System.currentTimeMillis() - startRetry > acquireTimeout) {
+ break;
+ }
+ }
+ long retryDuration = System.currentTimeMillis() - startRetry;
+
+ if (lockResponse.getState() != LockState.ACQUIRED) {
+ if (lockResponse.getState() == LockState.WAITING) {
+ client.unlock(lockResponse.getLockid());
+ }
+ throw new RuntimeException(
+ "Acquire lock failed with time: " + Duration.ofMillis(retryDuration));
+ }
+ return lockResponse.getLockid();
+ }
+
+ private void unlock(long lockId) throws TException {
+ client.unlock(lockId);
+ }
+
+ @Override
+ public void close() {
+ this.client.close();
+ }
+
+ /** Create a hive lock factory. */
+ public static CatalogLock.Factory createFactory(HiveConf hiveConf) {
+ return new HiveCatalogLockFactory(hiveConf);
+ }
+
+ private static class HiveCatalogLockFactory implements CatalogLock.Factory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableHiveConf hiveConf;
+
+ public HiveCatalogLockFactory(HiveConf hiveConf) {
+ this.hiveConf = new SerializableHiveConf(hiveConf);
+ }
+
+ @Override
+ public CatalogLock create() {
+ HiveConf conf = hiveConf.conf();
+ long checkMaxSleep =
+ TimeUtils.parseDuration(
+ conf.get(
+ LOCK_CHECK_MAX_SLEEP.key(),
+ TimeUtils.getStringInMillis(
+ LOCK_CHECK_MAX_SLEEP.defaultValue())))
+ .toMillis();
+ long acquireTimeout =
+ TimeUtils.parseDuration(
+ conf.get(
+ LOCK_ACQUIRE_TIMEOUT.key(),
+ TimeUtils.getStringInMillis(
+ LOCK_ACQUIRE_TIMEOUT.defaultValue())))
+ .toMillis();
+ return new HiveCatalogLock(
+ HiveCatalog.createClient(conf), checkMaxSleep, acquireTimeout);
+ }
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
new file mode 100644
index 00000000..056598f1
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/** Wrap {@link JobConf} to a serializable class. */
+public class SerializableHiveConf implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient JobConf jobConf;
+
+ public SerializableHiveConf(HiveConf conf) {
+ this.jobConf = createJobConfWithCredentials(conf);
+ }
+
+ public HiveConf conf() {
+ HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
+ // to make sure Hive configuration properties in conf not be overridden
+ hiveConf.addResource(jobConf);
+ return hiveConf;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ // we write the jobConf through a separate serializer to avoid cryptic exceptions when it
+ // corrupts the serialization stream
+ final DataOutputSerializer ser = new DataOutputSerializer(256);
+ jobConf.write(ser);
+ out.writeInt(ser.length());
+ out.write(ser.getSharedBuffer(), 0, ser.length());
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ final byte[] data = new byte[in.readInt()];
+ in.readFully(data);
+ final DataInputDeserializer deser = new DataInputDeserializer(data);
+ this.jobConf = new JobConf();
+ try {
+ jobConf.readFields(deser);
+ } catch (IOException e) {
+ throw new IOException(
+ "Could not deserialize JobConf, the serialized and de-serialized don't match.",
+ e);
+ }
+ Credentials currentUserCreds = UserGroupInformation.getCurrentUser().getCredentials();
+ if (currentUserCreds != null) {
+ jobConf.getCredentials().addAll(currentUserCreds);
+ }
+ }
+
+ private static void addCredentialsIntoJobConf(JobConf jobConf) {
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to determine current user", e);
+ }
+ Credentials currentUserCreds = currentUser.getCredentials();
+ if (currentUserCreds != null) {
+ jobConf.getCredentials().mergeAll(currentUserCreds);
+ }
+ }
+
+ private static JobConf createJobConfWithCredentials(Configuration configuration) {
+ JobConf jobConf = new JobConf(configuration);
+ addCredentialsIntoJobConf(jobConf);
+ return jobConf;
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 1b5654b7..e449ddcb 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -23,12 +23,17 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveRunnerSetup;
import com.klarna.hiverunner.annotations.HiveSQL;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -40,6 +45,13 @@ import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/** IT cases for {@link HiveCatalog}. */
@RunWith(FlinkEmbeddedHiveRunner.class)
@@ -53,6 +65,20 @@ public class HiveCatalogITCase {
@HiveSQL(files = {})
private static HiveShell hiveShell;
+ @HiveRunnerSetup
+ private static final HiveRunnerConfig CONFIG =
+ new HiveRunnerConfig() {
+ {
+ // catalog lock needs txn manager
+ // hive-3.x requires a proper txn manager to create ACID table
+ getHiveConfSystemOverride()
+ .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
+ getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ // tell TxnHandler to prepare txn DB
+ getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true");
+ }
+ };
+
@Before
public void before() throws Exception {
hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
@@ -70,7 +96,8 @@ public class HiveCatalogITCase {
" 'type' = 'table-store',",
" 'metastore' = 'hive',",
" 'uri' = '',",
- " 'warehouse' = '" + path + "'",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true'",
")"))
.await();
tEnv.executeSql("USE CATALOG my_hive").await();
@@ -223,6 +250,48 @@ public class HiveCatalogITCase {
}
}
+ @Test
+ public void testHiveLock() throws InterruptedException {
+ tEnv.executeSql("CREATE TABLE T (a INT)");
+ CatalogLock.Factory lockFactory =
+ ((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
+ .catalog()
+ .lockFactory()
+ .get();
+
+ AtomicInteger count = new AtomicInteger(0);
+ List<Thread> threads = new ArrayList<>();
+ Callable<Void> unsafeIncrement =
+ () -> {
+ int nextCount = count.get() + 1;
+ Thread.sleep(1);
+ count.set(nextCount);
+ return null;
+ };
+ for (int i = 0; i < 10; i++) {
+ Thread thread =
+ new Thread(
+ () -> {
+ CatalogLock lock = lockFactory.create();
+ for (int j = 0; j < 10; j++) {
+ try {
+ lock.runWithLock("test_db", "T", unsafeIncrement);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ assertThat(count.get()).isEqualTo(100);
+ }
+
private List<Row> collect(String sql) throws Exception {
List<Row> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {