You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/11 02:14:06 UTC
[flink] branch master updated: [FLINK-17452][hive] Support creating
Hive tables with constraints
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 25cbce1 [FLINK-17452][hive] Support creating Hive tables with constraints
25cbce1 is described below
commit 25cbce12d7201642b6031e38583489cd803fb1a5
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 11 10:13:21 2020 +0800
[FLINK-17452][hive] Support creating Hive tables with constraints
This closes #12017
---
.../flink/table/catalog/hive/HiveCatalog.java | 30 +++++++-
.../hive/client/HiveMetastoreClientWrapper.java | 17 +++++
.../flink/table/catalog/hive/client/HiveShim.java | 6 ++
.../table/catalog/hive/client/HiveShimV100.java | 12 +++
.../table/catalog/hive/client/HiveShimV210.java | 70 +++++++++++++++++
.../table/catalog/hive/client/HiveShimV310.java | 87 +++++++++++++++++++++-
.../connectors/hive/HiveTableFactoryTest.java | 1 +
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 28 +++++++
8 files changed, 246 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 42a6e60..6d9dc57 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -105,6 +105,7 @@ import java.io.File;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -374,8 +375,35 @@ public class HiveCatalog extends AbstractCatalog {
Table hiveTable = instantiateHiveTable(tablePath, table);
+ UniqueConstraint pkConstraint = null;
+ List<String> notNullCols = new ArrayList<>();
+ boolean isGeneric = isGenericForCreate(table.getOptions());
+ if (!isGeneric) {
+ pkConstraint = table.getSchema().getPrimaryKey().orElse(null);
+ for (int i = 0; i < table.getSchema().getFieldDataTypes().length; i++) {
+ if (!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) {
+ notNullCols.add(table.getSchema().getFieldNames()[i]);
+ }
+ }
+ }
+
try {
- client.createTable(hiveTable);
+ if (pkConstraint != null || !notNullCols.isEmpty()) {
+ // for now we just create constraints that are DISABLE, NOVALIDATE, RELY
+ Byte[] pkTraits = new Byte[pkConstraint == null ? 0 : pkConstraint.getColumns().size()];
+ Arrays.fill(pkTraits, HiveTableUtil.relyConstraint((byte) 0));
+ Byte[] nnTraits = new Byte[notNullCols.size()];
+ Arrays.fill(nnTraits, HiveTableUtil.relyConstraint((byte) 0));
+ client.createTableWithConstraints(
+ hiveTable,
+ hiveConf,
+ pkConstraint,
+ Arrays.asList(pkTraits),
+ notNullCols,
+ Arrays.asList(nnTraits));
+ } else {
+ client.createTable(hiveTable);
+ }
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(getName(), tablePath, e);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index 77a8058..63a557b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -266,4 +266,21 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
hiveShim.alterPartition(client, databaseName, tableName, partition);
}
+ public void createTableWithConstraints(
+ Table table,
+ Configuration conf,
+ UniqueConstraint pk,
+ List<Byte> pkTraits,
+ List<String> notNullCols,
+ List<Byte> nnTraits) {
+ hiveShim.createTableWithConstraints(
+ client,
+ table,
+ conf,
+ pk,
+ pkTraits,
+ notNullCols,
+ nnTraits);
+ }
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 258c52c..32e2a72 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -199,4 +199,10 @@ public interface HiveShim extends Serializable {
* Converts a Hive primitive java object to corresponding Writable object.
*/
@Nullable Writable hivePrimitiveToWritable(@Nullable Object value);
+
+ /**
+ * Creates a table with PK and NOT NULL constraints.
+ */
+ void createTableWithConstraints(IMetaStoreClient client, Table table, Configuration conf,
+ UniqueConstraint pk, List<Byte> pkTraits, List<String> notNullCols, List<Byte> nnTraits);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index dae39d1..085b3e1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -296,6 +296,18 @@ public class HiveShimV100 implements HiveShim {
return optional.orElseThrow(() -> new FlinkHiveException("Unsupported primitive java value of class " + value.getClass().getName()));
}
+ @Override
+ public void createTableWithConstraints(
+ IMetaStoreClient client,
+ Table table,
+ Configuration conf,
+ UniqueConstraint pk,
+ List<Byte> pkTraits,
+ List<String> notNullCols,
+ List<Byte> nnTraits) {
+ throw new UnsupportedOperationException("Table constraints not supported until 2.1.0");
+ }
+
Optional<Writable> javaToWritable(@Nonnull Object value) {
Writable writable = null;
// in case value is already a Writable
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
index 5d1bef4..4c0fe7c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
@@ -22,18 +22,23 @@ import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -109,4 +114,69 @@ public class HiveShimV210 extends HiveShimV201 {
}
}
+ @Override
+ public void createTableWithConstraints(
+ IMetaStoreClient client,
+ Table table,
+ Configuration conf,
+ UniqueConstraint pk,
+ List<Byte> pkTraits,
+ List<String> notNullCols,
+ List<Byte> nnTraits) {
+ if (!notNullCols.isEmpty()) {
+ throw new UnsupportedOperationException("NOT NULL constraints not supported until 3.0.0");
+ }
+ try {
+ List<Object> hivePKs = createHivePKs(table, pk, pkTraits);
+ // createTableWithConstraints takes PK and FK lists
+ HiveReflectionUtils.invokeMethod(
+ client.getClass(),
+ client,
+ "createTableWithConstraints",
+ new Class[]{Table.class, List.class, List.class},
+ new Object[]{table, hivePKs, Collections.emptyList()});
+ } catch (Exception e) {
+ throw new CatalogException("Failed to create Hive table with constraints", e);
+ }
+ }
+
+ List<Object> createHivePKs(Table table, UniqueConstraint pk, List<Byte> traits)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+ NoSuchMethodException, InvocationTargetException {
+ List<Object> res = new ArrayList<>();
+ if (pk != null) {
+ Class pkClz = Class.forName("org.apache.hadoop.hive.metastore.api.SQLPrimaryKey");
+ // PK constructor takes dbName, tableName, colName, keySeq, pkName, enable, validate, rely
+ Constructor constructor = pkClz.getConstructor(
+ String.class,
+ String.class,
+ String.class,
+ int.class,
+ String.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ int seq = 1;
+ Preconditions.checkArgument(pk.getColumns().size() == traits.size(),
+ "Number of PK columns and traits mismatch");
+ for (int i = 0; i < pk.getColumns().size(); i++) {
+ String col = pk.getColumns().get(i);
+ byte trait = traits.get(i);
+ boolean enable = HiveTableUtil.requireEnableConstraint(trait);
+ boolean validate = HiveTableUtil.requireValidateConstraint(trait);
+ boolean rely = HiveTableUtil.requireRelyConstraint(trait);
+ Object hivePK = constructor.newInstance(
+ table.getDbName(),
+ table.getTableName(),
+ col,
+ seq++,
+ pk.getName(),
+ enable,
+ validate,
+ rely);
+ res.add(hivePK);
+ }
+ }
+ return res;
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 435a8ca..fde469c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -19,14 +19,17 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.io.Writable;
import java.lang.reflect.Constructor;
@@ -37,6 +40,8 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -134,10 +139,7 @@ public class HiveShimV310 extends HiveShimV235 {
@Override
public Set<String> getNotNullColumns(IMetaStoreClient client, Configuration conf, String dbName, String tableName) {
try {
- // HMS catalog (https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and we currently
- // just get the default catalog.
- String hiveDefaultCatalog = (String) HiveReflectionUtils.invokeMethod(getMetaStoreUtilsClass(), null,
- "getDefaultCatalog", new Class[]{Configuration.class}, new Object[]{conf});
+ String hiveDefaultCatalog = getHMSDefaultCatalog(conf);
Class requestClz = Class.forName("org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest");
Object request = requestClz.getDeclaredConstructor(String.class, String.class, String.class)
.newInstance(hiveDefaultCatalog, dbName, tableName);
@@ -239,4 +241,81 @@ public class HiveShimV310 extends HiveShimV235 {
}
throw new FlinkHiveException("Unsupported primitive java value of class " + value.getClass().getName());
}
+
+ @Override
+ public void createTableWithConstraints(
+ IMetaStoreClient client,
+ Table table,
+ Configuration conf,
+ UniqueConstraint pk,
+ List<Byte> pkTraits,
+ List<String> notNullCols,
+ List<Byte> nnTraits) {
+ try {
+ List<Object> hivePKs = createHivePKs(table, pk, pkTraits);
+ List<Object> hiveNNs = createHiveNNs(table, conf, notNullCols, nnTraits);
+ // createTableWithConstraints takes PK, FK, UNIQUE, NN, DEFAULT, CHECK lists
+ HiveReflectionUtils.invokeMethod(
+ client.getClass(),
+ client,
+ "createTableWithConstraints",
+ new Class[]{Table.class, List.class, List.class, List.class, List.class, List.class, List.class},
+ new Object[]{table, hivePKs, Collections.emptyList(), Collections.emptyList(), hiveNNs,
+ Collections.emptyList(), Collections.emptyList()});
+ } catch (Exception e) {
+ throw new CatalogException("Failed to create Hive table with constraints", e);
+ }
+ }
+
+ List<Object> createHiveNNs(
+ Table table,
+ Configuration conf,
+ List<String> nnCols,
+ List<Byte> traits)
+ throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
+ IllegalAccessException, InstantiationException {
+ List<Object> res = new ArrayList<>();
+ if (!nnCols.isEmpty()) {
+ Preconditions.checkArgument(nnCols.size() == traits.size(), "Number of NN columns and traits mismatch");
+ Class nnClz = Class.forName("org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint");
+ // NN constructor takes catName, dbName, tableName, colName, nnName, enable, validate, rely
+ Constructor constructor = nnClz.getConstructor(
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ String catName = getHMSDefaultCatalog(conf);
+ for (int i = 0; i < nnCols.size(); i++) {
+ String col = nnCols.get(i);
+ byte trait = traits.get(i);
+ boolean enable = HiveTableUtil.requireEnableConstraint(trait);
+ boolean validate = HiveTableUtil.requireValidateConstraint(trait);
+ boolean rely = HiveTableUtil.requireRelyConstraint(trait);
+ // just set nnName to null and HMS will automatically generate one for us
+ Object hiveNN = constructor.newInstance(
+ catName,
+ table.getDbName(),
+ table.getTableName(),
+ col,
+ null,
+ enable,
+ validate,
+ rely);
+ res.add(hiveNN);
+ }
+ }
+ return res;
+ }
+
+ String getHMSDefaultCatalog(Configuration conf)
+ throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ // HMS catalog (https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and we currently
+ // just get the default catalog.
+ return (String) HiveReflectionUtils.invokeMethod(getMetaStoreUtilsClass(), null,
+ "getDefaultCatalog", new Class[]{Configuration.class}, new Object[]{conf});
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
index 34f25fe..80bf25f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
@@ -101,6 +101,7 @@ public class HiveTableFactoryTest {
.build();
Map<String, String> properties = new HashMap<>();
+ properties.put(CatalogConfig.IS_GENERIC, String.valueOf(false));
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 58e41e9..015995f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,8 +19,10 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp;
+import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
@@ -37,13 +39,16 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -161,6 +166,29 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
checkStatistics(1000, 1000);
}
+ @Test
+ public void testCreateTableWithConstraints() throws Exception {
+ Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
+ HiveCatalog hiveCatalog = (HiveCatalog) catalog;
+ hiveCatalog.createDatabase(db1, createDb(), false);
+ TableSchema.Builder builder = TableSchema.builder();
+ builder.fields(
+ new String[]{"x", "y", "z"},
+ new DataType[]{DataTypes.INT().notNull(), DataTypes.TIMESTAMP(9).notNull(), DataTypes.BIGINT()});
+ builder.primaryKey("pk_name", new String[]{"x"});
+ hiveCatalog.createTable(path1, new CatalogTableImpl(builder.build(), getBatchTableProperties(), null), false);
+ CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(path1);
+ assertTrue("PK not present", catalogTable.getSchema().getPrimaryKey().isPresent());
+ UniqueConstraint pk = catalogTable.getSchema().getPrimaryKey().get();
+ assertEquals("pk_name", pk.getName());
+ assertEquals(Collections.singletonList("x"), pk.getColumns());
+ assertFalse(catalogTable.getSchema().getFieldDataTypes()[0].getLogicalType().isNullable());
+ assertFalse(catalogTable.getSchema().getFieldDataTypes()[1].getLogicalType().isNullable());
+ assertTrue(catalogTable.getSchema().getFieldDataTypes()[2].getLogicalType().isNullable());
+
+ hiveCatalog.dropDatabase(db1, false, true);
+ }
+
private void checkStatistics(int inputStat, int expectStat) throws Exception {
catalog.dropTable(path1, true);