You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/11 02:14:06 UTC

[flink] branch master updated: [FLINK-17452][hive] Support creating Hive tables with constraints

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 25cbce1  [FLINK-17452][hive] Support creating Hive tables with constraints
25cbce1 is described below

commit 25cbce12d7201642b6031e38583489cd803fb1a5
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 11 10:13:21 2020 +0800

    [FLINK-17452][hive] Support creating Hive tables with constraints
    
    
    This closes #12017
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 30 +++++++-
 .../hive/client/HiveMetastoreClientWrapper.java    | 17 +++++
 .../flink/table/catalog/hive/client/HiveShim.java  |  6 ++
 .../table/catalog/hive/client/HiveShimV100.java    | 12 +++
 .../table/catalog/hive/client/HiveShimV210.java    | 70 +++++++++++++++++
 .../table/catalog/hive/client/HiveShimV310.java    | 87 +++++++++++++++++++++-
 .../connectors/hive/HiveTableFactoryTest.java      |  1 +
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 28 +++++++
 8 files changed, 246 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 42a6e60..6d9dc57 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -105,6 +105,7 @@ import java.io.File;
 import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -374,8 +375,35 @@ public class HiveCatalog extends AbstractCatalog {
 
 		Table hiveTable = instantiateHiveTable(tablePath, table);
 
+		UniqueConstraint pkConstraint = null;
+		List<String> notNullCols = new ArrayList<>();
+		boolean isGeneric = isGenericForCreate(table.getOptions());
+		if (!isGeneric) {
+			pkConstraint = table.getSchema().getPrimaryKey().orElse(null);
+			for (int i = 0; i < table.getSchema().getFieldDataTypes().length; i++) {
+				if (!table.getSchema().getFieldDataTypes()[i].getLogicalType().isNullable()) {
+					notNullCols.add(table.getSchema().getFieldNames()[i]);
+				}
+			}
+		}
+
 		try {
-			client.createTable(hiveTable);
+			if (pkConstraint != null || !notNullCols.isEmpty()) {
+				// for now we just create constraints that are DISABLE, NOVALIDATE, RELY
+				Byte[] pkTraits = new Byte[pkConstraint == null ? 0 : pkConstraint.getColumns().size()];
+				Arrays.fill(pkTraits, HiveTableUtil.relyConstraint((byte) 0));
+				Byte[] nnTraits = new Byte[notNullCols.size()];
+				Arrays.fill(nnTraits, HiveTableUtil.relyConstraint((byte) 0));
+				client.createTableWithConstraints(
+						hiveTable,
+						hiveConf,
+						pkConstraint,
+						Arrays.asList(pkTraits),
+						notNullCols,
+						Arrays.asList(nnTraits));
+			} else {
+				client.createTable(hiveTable);
+			}
 		} catch (AlreadyExistsException e) {
 			if (!ignoreIfExists) {
 				throw new TableAlreadyExistException(getName(), tablePath, e);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index 77a8058..63a557b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -266,4 +266,21 @@ public class HiveMetastoreClientWrapper implements AutoCloseable {
 		hiveShim.alterPartition(client, databaseName, tableName, partition);
 	}
 
+	public void createTableWithConstraints(
+			Table table,
+			Configuration conf,
+			UniqueConstraint pk,
+			List<Byte> pkTraits,
+			List<String> notNullCols,
+			List<Byte> nnTraits) {
+		hiveShim.createTableWithConstraints(
+				client,
+				table,
+				conf,
+				pk,
+				pkTraits,
+				notNullCols,
+				nnTraits);
+	}
+
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 258c52c..32e2a72 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -199,4 +199,10 @@ public interface HiveShim extends Serializable {
 	 * Converts a Hive primitive java object to corresponding Writable object.
 	 */
 	@Nullable Writable hivePrimitiveToWritable(@Nullable Object value);
+
+	/**
+	 * Creates a table with PK and NOT NULL constraints.
+	 */
+	void createTableWithConstraints(IMetaStoreClient client, Table table, Configuration conf,
+			UniqueConstraint pk, List<Byte> pkTraits, List<String> notNullCols, List<Byte> nnTraits);
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index dae39d1..085b3e1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -296,6 +296,18 @@ public class HiveShimV100 implements HiveShim {
 		return optional.orElseThrow(() -> new FlinkHiveException("Unsupported primitive java value of class " + value.getClass().getName()));
 	}
 
+	@Override
+	public void createTableWithConstraints(
+			IMetaStoreClient client,
+			Table table,
+			Configuration conf,
+			UniqueConstraint pk,
+			List<Byte> pkTraits,
+			List<String> notNullCols,
+			List<Byte> nnTraits) {
+		throw new UnsupportedOperationException("Table constraints not supported until 2.1.0");
+	}
+
 	Optional<Writable> javaToWritable(@Nonnull Object value) {
 		Writable writable = null;
 		// in case value is already a Writable
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
index 5d1bef4..4c0fe7c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
@@ -22,18 +22,23 @@ import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -109,4 +114,69 @@ public class HiveShimV210 extends HiveShimV201 {
 		}
 	}
 
+	@Override
+	public void createTableWithConstraints(
+			IMetaStoreClient client,
+			Table table,
+			Configuration conf,
+			UniqueConstraint pk,
+			List<Byte> pkTraits,
+			List<String> notNullCols,
+			List<Byte> nnTraits) {
+		if (!notNullCols.isEmpty()) {
+			throw new UnsupportedOperationException("NOT NULL constraints not supported until 3.0.0");
+		}
+		try {
+			List<Object> hivePKs = createHivePKs(table, pk, pkTraits);
+			// createTableWithConstraints takes PK and FK lists
+			HiveReflectionUtils.invokeMethod(
+					client.getClass(),
+					client,
+					"createTableWithConstraints",
+					new Class[]{Table.class, List.class, List.class},
+					new Object[]{table, hivePKs, Collections.emptyList()});
+		} catch (Exception e) {
+			throw new CatalogException("Failed to create Hive table with constraints", e);
+		}
+	}
+
+	List<Object> createHivePKs(Table table, UniqueConstraint pk, List<Byte> traits)
+			throws ClassNotFoundException, IllegalAccessException, InstantiationException,
+			NoSuchMethodException, InvocationTargetException {
+		List<Object> res = new ArrayList<>();
+		if (pk != null) {
+			Class pkClz = Class.forName("org.apache.hadoop.hive.metastore.api.SQLPrimaryKey");
+			// PK constructor takes dbName, tableName, colName, keySeq, pkName, enable, validate, rely
+			Constructor constructor = pkClz.getConstructor(
+					String.class,
+					String.class,
+					String.class,
+					int.class,
+					String.class,
+					boolean.class,
+					boolean.class,
+					boolean.class);
+			int seq = 1;
+			Preconditions.checkArgument(pk.getColumns().size() == traits.size(),
+					"Number of PK columns and traits mismatch");
+			for (int i = 0; i < pk.getColumns().size(); i++) {
+				String col = pk.getColumns().get(i);
+				byte trait = traits.get(i);
+				boolean enable = HiveTableUtil.requireEnableConstraint(trait);
+				boolean validate = HiveTableUtil.requireValidateConstraint(trait);
+				boolean rely = HiveTableUtil.requireRelyConstraint(trait);
+				Object hivePK = constructor.newInstance(
+						table.getDbName(),
+						table.getTableName(),
+						col,
+						seq++,
+						pk.getName(),
+						enable,
+						validate,
+						rely);
+				res.add(hivePK);
+			}
+		}
+		return res;
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 435a8ca..fde469c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -19,14 +19,17 @@
 package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.Writable;
 
 import java.lang.reflect.Constructor;
@@ -37,6 +40,8 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -134,10 +139,7 @@ public class HiveShimV310 extends HiveShimV235 {
 	@Override
 	public Set<String> getNotNullColumns(IMetaStoreClient client, Configuration conf, String dbName, String tableName) {
 		try {
-			// HMS catalog (https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and we currently
-			// just get the default catalog.
-			String hiveDefaultCatalog = (String) HiveReflectionUtils.invokeMethod(getMetaStoreUtilsClass(), null,
-					"getDefaultCatalog", new Class[]{Configuration.class}, new Object[]{conf});
+			String hiveDefaultCatalog = getHMSDefaultCatalog(conf);
 			Class requestClz = Class.forName("org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest");
 			Object request = requestClz.getDeclaredConstructor(String.class, String.class, String.class)
 					.newInstance(hiveDefaultCatalog, dbName, tableName);
@@ -239,4 +241,81 @@ public class HiveShimV310 extends HiveShimV235 {
 		}
 		throw new FlinkHiveException("Unsupported primitive java value of class " + value.getClass().getName());
 	}
+
+	@Override
+	public void createTableWithConstraints(
+			IMetaStoreClient client,
+			Table table,
+			Configuration conf,
+			UniqueConstraint pk,
+			List<Byte> pkTraits,
+			List<String> notNullCols,
+			List<Byte> nnTraits) {
+		try {
+			List<Object> hivePKs = createHivePKs(table, pk, pkTraits);
+			List<Object> hiveNNs = createHiveNNs(table, conf, notNullCols, nnTraits);
+			// createTableWithConstraints takes PK, FK, UNIQUE, NN, DEFAULT, CHECK lists
+			HiveReflectionUtils.invokeMethod(
+					client.getClass(),
+					client,
+					"createTableWithConstraints",
+					new Class[]{Table.class, List.class, List.class, List.class, List.class, List.class, List.class},
+					new Object[]{table, hivePKs, Collections.emptyList(), Collections.emptyList(), hiveNNs,
+							Collections.emptyList(), Collections.emptyList()});
+		} catch (Exception e) {
+			throw new CatalogException("Failed to create Hive table with constraints", e);
+		}
+	}
+
+	List<Object> createHiveNNs(
+			Table table,
+			Configuration conf,
+			List<String> nnCols,
+			List<Byte> traits)
+			throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
+			IllegalAccessException, InstantiationException {
+		List<Object> res = new ArrayList<>();
+		if (!nnCols.isEmpty()) {
+			Preconditions.checkArgument(nnCols.size() == traits.size(), "Number of NN columns and traits mismatch");
+			Class nnClz = Class.forName("org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint");
+			// NN constructor takes catName, dbName, tableName, colName, nnName, enable, validate, rely
+			Constructor constructor = nnClz.getConstructor(
+					String.class,
+					String.class,
+					String.class,
+					String.class,
+					String.class,
+					boolean.class,
+					boolean.class,
+					boolean.class);
+			String catName = getHMSDefaultCatalog(conf);
+			for (int i = 0; i < nnCols.size(); i++) {
+				String col = nnCols.get(i);
+				byte trait = traits.get(i);
+				boolean enable = HiveTableUtil.requireEnableConstraint(trait);
+				boolean validate = HiveTableUtil.requireValidateConstraint(trait);
+				boolean rely = HiveTableUtil.requireRelyConstraint(trait);
+				// just set nnName to null and HMS will automatically generate one for us
+				Object hiveNN = constructor.newInstance(
+						catName,
+						table.getDbName(),
+						table.getTableName(),
+						col,
+						null,
+						enable,
+						validate,
+						rely);
+				res.add(hiveNN);
+			}
+		}
+		return res;
+	}
+
+	String getHMSDefaultCatalog(Configuration conf)
+			throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+		// HMS catalog (https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and we currently
+		// just get the default catalog.
+		return (String) HiveReflectionUtils.invokeMethod(getMetaStoreUtilsClass(), null,
+				"getDefaultCatalog", new Class[]{Configuration.class}, new Object[]{conf});
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
index 34f25fe..80bf25f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
@@ -101,6 +101,7 @@ public class HiveTableFactoryTest {
 			.build();
 
 		Map<String, String> properties = new HashMap<>();
+		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(false));
 
 		catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
 		ObjectPath path = new ObjectPath("mydb", "mytable");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 58e41e9..015995f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp;
+import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -37,13 +39,16 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -161,6 +166,29 @@ public class HiveCatalogHiveMetadataTest extends HiveCatalogMetadataTestBase {
 		checkStatistics(1000, 1000);
 	}
 
+	@Test
+	public void testCreateTableWithConstraints() throws Exception {
+		Assume.assumeTrue(HiveVersionTestUtil.HIVE_310_OR_LATER);
+		HiveCatalog hiveCatalog = (HiveCatalog) catalog;
+		hiveCatalog.createDatabase(db1, createDb(), false);
+		TableSchema.Builder builder = TableSchema.builder();
+		builder.fields(
+				new String[]{"x", "y", "z"},
+				new DataType[]{DataTypes.INT().notNull(), DataTypes.TIMESTAMP(9).notNull(), DataTypes.BIGINT()});
+		builder.primaryKey("pk_name", new String[]{"x"});
+		hiveCatalog.createTable(path1, new CatalogTableImpl(builder.build(), getBatchTableProperties(), null), false);
+		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(path1);
+		assertTrue("PK not present", catalogTable.getSchema().getPrimaryKey().isPresent());
+		UniqueConstraint pk = catalogTable.getSchema().getPrimaryKey().get();
+		assertEquals("pk_name", pk.getName());
+		assertEquals(Collections.singletonList("x"), pk.getColumns());
+		assertFalse(catalogTable.getSchema().getFieldDataTypes()[0].getLogicalType().isNullable());
+		assertFalse(catalogTable.getSchema().getFieldDataTypes()[1].getLogicalType().isNullable());
+		assertTrue(catalogTable.getSchema().getFieldDataTypes()[2].getLogicalType().isNullable());
+
+		hiveCatalog.dropDatabase(db1, false, true);
+	}
+
 	private void checkStatistics(int inputStat, int expectStat) throws Exception {
 		catalog.dropTable(path1, true);