You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/01 18:30:16 UTC

[flink] branch master updated: [FLINK-13023][hive] Generate HiveTableSource from a Hive table

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b867750  [FLINK-13023][hive] Generate HiveTableSource from a Hive table
b867750 is described below

commit b867750b4ba8b58767aa600d732fbfa5fd7c64fe
Author: Xuefu Zhang <xu...@alibaba-inc.com>
AuthorDate: Fri Jun 28 09:50:42 2019 -0700

    [FLINK-13023][hive] Generate HiveTableSource from a Hive table
---
 .../batch/connectors/hive/HiveTableFactory.java    |  7 ++--
 .../connectors/hive/HiveTableInputFormat.java      | 31 ++++++++---------
 .../batch/connectors/hive/HiveTableSource.java     | 40 ++++++++++------------
 .../batch/connectors/hive/HiveInputFormatTest.java |  6 ++--
 .../connectors/hive/HiveTableFactoryTest.java      |  2 ++
 .../batch/connectors/hive/HiveTableSourceTest.java |  6 +++-
 6 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index fd142da..a22014a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -88,8 +88,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
 	 * Creates and configures a {@link org.apache.flink.table.sources.InputFormatTableSource} using the given {@link CatalogTable}.
 	 */
 	private InputFormatTableSource<Row> createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
-		// TODO: create an InputFormatTableSource from a HiveCatalogTable instance.
-		return null;
+		return new HiveTableSource(new JobConf(hiveConf), tablePath, table);
 	}
 
 	@Override
@@ -100,7 +99,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
 		boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
 		if (!isGeneric) {
-			return createOutputFormatTableSink(tablePath, (CatalogTableImpl) table);
+			return createOutputFormatTableSink(tablePath, table);
 		} else {
 			return TableFactoryUtil.findAndCreateTableSink(table);
 		}
@@ -109,7 +108,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
 	/**
 	 * Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
 	 */
-	private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTableImpl table) {
+	private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
 		return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
index fb99ee4..be6f92a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.types.Row;
 
@@ -75,11 +77,10 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 	protected transient boolean fetched = false;
 	protected transient boolean hasNext;
 
-	private boolean isPartitioned;
 	private RowTypeInfo rowTypeInfo;
 
 	//Necessary info to init deserializer
-	private String[] partitionColNames;
+	private List<String> partitionColNames;
 	//For non-partition hive table, partitions only contains one partition which partitionValues is empty.
 	private List<HiveTablePartition> partitions;
 	private transient Deserializer deserializer;
@@ -92,16 +93,16 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 
 	public HiveTableInputFormat(
 			JobConf jobConf,
-			boolean isPartitioned,
-			String[] partitionColNames,
-			List<HiveTablePartition> partitions,
-			RowTypeInfo rowTypeInfo) {
+			CatalogTable catalogTable,
+			List<HiveTablePartition> partitions) {
 		super(jobConf.getCredentials());
-		this.rowTypeInfo = checkNotNull(rowTypeInfo, "rowTypeInfo can not be null.");
-		this.jobConf = new JobConf(jobConf);
-		this.isPartitioned = isPartitioned;
-		this.partitionColNames = partitionColNames;
+		checkNotNull(catalogTable, "catalogTable can not be null.");
 		this.partitions = checkNotNull(partitions, "partitions can not be null.");
+
+		this.jobConf = new JobConf(jobConf);
+		this.partitionColNames = catalogTable.getPartitionKeys();
+		TableSchema tableSchema = catalogTable.getSchema();
+		this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
 	}
 
 	@Override
@@ -221,10 +222,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 						structObjectInspector.getStructFieldData(hiveRowStruct, structField), structField.getFieldObjectInspector());
 				row.setField(index, object);
 			}
-			if (isPartitioned) {
-				for (String partition : partitionColNames){
-					row.setField(index++, hiveTablePartition.getPartitionSpec().get(partition));
-				}
+			for (String partition : partitionColNames){
+				row.setField(index++, hiveTablePartition.getPartitionSpec().get(partition));
 			}
 		} catch (Exception e){
 			logger.error("Error happens when converting hive data type to flink data type.");
@@ -246,7 +245,6 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		super.write(out);
 		jobConf.write(out);
-		out.writeObject(isPartitioned);
 		out.writeObject(rowTypeInfo);
 		out.writeObject(partitionColNames);
 		out.writeObject(partitions);
@@ -264,9 +262,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
 		if (currentUserCreds != null) {
 			jobConf.getCredentials().addAll(currentUserCreds);
 		}
-		isPartitioned = (boolean) in.readObject();
 		rowTypeInfo = (RowTypeInfo) in.readObject();
-		partitionColNames = (String[]) in.readObject();
+		partitionColNames = (List<String>) in.readObject();
 		partitions = (List<HiveTablePartition>) in.readObject();
 	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 279fcb5..19e46de 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -19,9 +19,10 @@
 package org.apache.flink.batch.connectors.hive;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
@@ -52,42 +53,33 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
 
 	private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class);
 
-	private final TableSchema tableSchema;
 	private final JobConf jobConf;
-	private final String dbName;
-	private final String tableName;
-	private final Boolean isPartitionTable;
-	private final String[] partitionColNames;
+	private final ObjectPath tablePath;
+	private final CatalogTable catalogTable;
 	private List<HiveTablePartition> allPartitions;
 	private String hiveVersion;
 
-	public HiveTableSource(TableSchema tableSchema,
-						JobConf jobConf,
-						String dbName,
-						String tableName,
-						String[] partitionColNames) {
-		this.tableSchema = tableSchema;
+	public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
 		this.jobConf = jobConf;
-		this.dbName = dbName;
-		this.tableName = tableName;
-		this.isPartitionTable = (null != partitionColNames && partitionColNames.length != 0);
-		this.partitionColNames = partitionColNames;
+		this.tablePath = tablePath;
+		this.catalogTable = catalogTable;
 		this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
 	}
 
 	@Override
 	public InputFormat getInputFormat() {
 		initAllPartitions();
-		return new HiveTableInputFormat(jobConf, isPartitionTable, partitionColNames, allPartitions, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
+		return new HiveTableInputFormat(jobConf, catalogTable, allPartitions);
 	}
 
 	@Override
 	public TableSchema getTableSchema() {
-		return tableSchema;
+		return catalogTable.getSchema();
 	}
 
 	@Override
 	public DataType getProducedDataType() {
+		TableSchema tableSchema = catalogTable.getSchema();
 		DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldCount()];
 		for (int i = 0; i < fields.length; i++) {
 			fields[i] = DataTypes.FIELD(tableSchema.getFieldName(i).get(), tableSchema.getFieldDataType(i).get());
@@ -101,17 +93,21 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
 		// Ideally, we need to go thru Catalog API to get all info we need here, which requires some major
 		// refactoring. We will postpone this until we merge Blink to Flink.
 		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
-			if (isPartitionTable) {
+			String dbName = tablePath.getDatabaseName();
+			String tableName = tablePath.getObjectName();
+			List<String> partitionColNames = catalogTable.getPartitionKeys();
+			if (partitionColNames != null && partitionColNames.size() > 0) {
 				List<Partition> partitions =
 						client.listPartitions(dbName, tableName, (short) -1);
 				for (Partition partition : partitions) {
 					StorageDescriptor sd = partition.getSd();
 					Map<String, Object> partitionColValues = new HashMap<>();
-					for (int i = 0; i < partitionColNames.length; i++) {
+					for (int i = 0; i < partitionColNames.size(); i++) {
+						String partitionColName = partitionColNames.get(i);
 						String partitionValue = partition.getValues().get(i);
-						DataType type = tableSchema.getFieldDataType(partitionColNames[i]).get();
+						DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
 						Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
-						partitionColValues.put(partitionColNames[i], partitionObject);
+						partitionColValues.put(partitionColName, partitionObject);
 					}
 					allPartitions.add(new HiveTablePartition(sd, partitionColValues));
 				}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 3f9445c..5560872 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -112,8 +114,8 @@ public class HiveInputFormatTest {
 		RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
 		List<HiveTablePartition> partitions = new ArrayList<>();
 		partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-		HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), false, null,
-																			partitions, rowTypeInfo);
+		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(new ObjectPath(dbName, tblName));
+		HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
 		DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
 		List<Row> rows = rowDataSet.collect();
 		Assert.assertEquals(4, rows.size());
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 4622322..237146d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -112,6 +112,8 @@ public class HiveTableFactoryTest {
 		HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
 		TableSink tableSink = tableFactory.createTableSink(path, table);
 		assertTrue(tableSink instanceof HiveTableSink);
+		TableSource tableSource = tableFactory.createTableSource(path, table);
+		assertTrue(tableSource instanceof HiveTableSource);
 	}
 
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index 1a8807f..d6cf124 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -110,7 +112,9 @@ public class HiveTableSourceTest {
 		client.createTable(tbl);
 		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
 		BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
-		HiveTableSource hiveTableSource = new HiveTableSource(tableSchema, new JobConf(hiveConf), dbName, tblName, null);
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
+		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
 		Table src = tEnv.fromTableSource(hiveTableSource);
 		DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(),
 																	tableSchema.getFieldNames()));