You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/01 18:30:16 UTC
[flink] branch master updated: [FLINK-13023][hive] Generate
HiveTableSource from a Hive table
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b867750 [FLINK-13023][hive] Generate HiveTableSource from a Hive table
b867750 is described below
commit b867750b4ba8b58767aa600d732fbfa5fd7c64fe
Author: Xuefu Zhang <xu...@alibaba-inc.com>
AuthorDate: Fri Jun 28 09:50:42 2019 -0700
[FLINK-13023][hive] Generate HiveTableSource from a Hive table
---
.../batch/connectors/hive/HiveTableFactory.java | 7 ++--
.../connectors/hive/HiveTableInputFormat.java | 31 ++++++++---------
.../batch/connectors/hive/HiveTableSource.java | 40 ++++++++++------------
.../batch/connectors/hive/HiveInputFormatTest.java | 6 ++--
.../connectors/hive/HiveTableFactoryTest.java | 2 ++
.../batch/connectors/hive/HiveTableSourceTest.java | 6 +++-
6 files changed, 46 insertions(+), 46 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index fd142da..a22014a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -88,8 +88,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
* Creates and configures a {@link org.apache.flink.table.sources.InputFormatTableSource} using the given {@link CatalogTable}.
*/
private InputFormatTableSource<Row> createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
- // TODO: create an InputFormatTableSource from a HiveCatalogTable instance.
- return null;
+ return new HiveTableSource(new JobConf(hiveConf), tablePath, table);
}
@Override
@@ -100,7 +99,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
- return createOutputFormatTableSink(tablePath, (CatalogTableImpl) table);
+ return createOutputFormatTableSink(tablePath, table);
} else {
return TableFactoryUtil.findAndCreateTableSink(table);
}
@@ -109,7 +108,7 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
/**
* Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
*/
- private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTableImpl table) {
+ private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
index fb99ee4..be6f92a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
@@ -75,11 +77,10 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
protected transient boolean fetched = false;
protected transient boolean hasNext;
- private boolean isPartitioned;
private RowTypeInfo rowTypeInfo;
//Necessary info to init deserializer
- private String[] partitionColNames;
+ private List<String> partitionColNames;
//For non-partition hive table, partitions only contains one partition which partitionValues is empty.
private List<HiveTablePartition> partitions;
private transient Deserializer deserializer;
@@ -92,16 +93,16 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
public HiveTableInputFormat(
JobConf jobConf,
- boolean isPartitioned,
- String[] partitionColNames,
- List<HiveTablePartition> partitions,
- RowTypeInfo rowTypeInfo) {
+ CatalogTable catalogTable,
+ List<HiveTablePartition> partitions) {
super(jobConf.getCredentials());
- this.rowTypeInfo = checkNotNull(rowTypeInfo, "rowTypeInfo can not be null.");
- this.jobConf = new JobConf(jobConf);
- this.isPartitioned = isPartitioned;
- this.partitionColNames = partitionColNames;
+ checkNotNull(catalogTable, "catalogTable can not be null.");
this.partitions = checkNotNull(partitions, "partitions can not be null.");
+
+ this.jobConf = new JobConf(jobConf);
+ this.partitionColNames = catalogTable.getPartitionKeys();
+ TableSchema tableSchema = catalogTable.getSchema();
+ this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}
@Override
@@ -221,10 +222,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
structObjectInspector.getStructFieldData(hiveRowStruct, structField), structField.getFieldObjectInspector());
row.setField(index, object);
}
- if (isPartitioned) {
- for (String partition : partitionColNames){
- row.setField(index++, hiveTablePartition.getPartitionSpec().get(partition));
- }
+ for (String partition : partitionColNames){
+ row.setField(index++, hiveTablePartition.getPartitionSpec().get(partition));
}
} catch (Exception e){
logger.error("Error happens when converting hive data type to flink data type.");
@@ -246,7 +245,6 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
jobConf.write(out);
- out.writeObject(isPartitioned);
out.writeObject(rowTypeInfo);
out.writeObject(partitionColNames);
out.writeObject(partitions);
@@ -264,9 +262,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT
if (currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
- isPartitioned = (boolean) in.readObject();
rowTypeInfo = (RowTypeInfo) in.readObject();
- partitionColNames = (String[]) in.readObject();
+ partitionColNames = (List<String>) in.readObject();
partitions = (List<HiveTablePartition>) in.readObject();
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 279fcb5..19e46de 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -19,9 +19,10 @@
package org.apache.flink.batch.connectors.hive;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
@@ -52,42 +53,33 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class);
- private final TableSchema tableSchema;
private final JobConf jobConf;
- private final String dbName;
- private final String tableName;
- private final Boolean isPartitionTable;
- private final String[] partitionColNames;
+ private final ObjectPath tablePath;
+ private final CatalogTable catalogTable;
private List<HiveTablePartition> allPartitions;
private String hiveVersion;
- public HiveTableSource(TableSchema tableSchema,
- JobConf jobConf,
- String dbName,
- String tableName,
- String[] partitionColNames) {
- this.tableSchema = tableSchema;
+ public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
this.jobConf = jobConf;
- this.dbName = dbName;
- this.tableName = tableName;
- this.isPartitionTable = (null != partitionColNames && partitionColNames.length != 0);
- this.partitionColNames = partitionColNames;
+ this.tablePath = tablePath;
+ this.catalogTable = catalogTable;
this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
}
@Override
public InputFormat getInputFormat() {
initAllPartitions();
- return new HiveTableInputFormat(jobConf, isPartitionTable, partitionColNames, allPartitions, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
+ return new HiveTableInputFormat(jobConf, catalogTable, allPartitions);
}
@Override
public TableSchema getTableSchema() {
- return tableSchema;
+ return catalogTable.getSchema();
}
@Override
public DataType getProducedDataType() {
+ TableSchema tableSchema = catalogTable.getSchema();
DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldCount()];
for (int i = 0; i < fields.length; i++) {
fields[i] = DataTypes.FIELD(tableSchema.getFieldName(i).get(), tableSchema.getFieldDataType(i).get());
@@ -101,17 +93,21 @@ public class HiveTableSource extends InputFormatTableSource<Row> {
// Ideally, we need to go thru Catalog API to get all info we need here, which requires some major
// refactoring. We will postpone this until we merge Blink to Flink.
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
- if (isPartitionTable) {
+ String dbName = tablePath.getDatabaseName();
+ String tableName = tablePath.getObjectName();
+ List<String> partitionColNames = catalogTable.getPartitionKeys();
+ if (partitionColNames != null && partitionColNames.size() > 0) {
List<Partition> partitions =
client.listPartitions(dbName, tableName, (short) -1);
for (Partition partition : partitions) {
StorageDescriptor sd = partition.getSd();
Map<String, Object> partitionColValues = new HashMap<>();
- for (int i = 0; i < partitionColNames.length; i++) {
+ for (int i = 0; i < partitionColNames.size(); i++) {
+ String partitionColName = partitionColNames.get(i);
String partitionValue = partition.getValues().get(i);
- DataType type = tableSchema.getFieldDataType(partitionColNames[i]).get();
+ DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
- partitionColValues.put(partitionColNames[i], partitionObject);
+ partitionColValues.put(partitionColName, partitionObject);
}
allPartitions.add(new HiveTablePartition(sd, partitionColValues));
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 3f9445c..5560872 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -112,8 +114,8 @@ public class HiveInputFormatTest {
RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
List<HiveTablePartition> partitions = new ArrayList<>();
partitions.add(new HiveTablePartition(sd, new HashMap<>()));
- HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), false, null,
- partitions, rowTypeInfo);
+ CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(new ObjectPath(dbName, tblName));
+ HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
List<Row> rows = rowDataSet.collect();
Assert.assertEquals(4, rows.size());
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 4622322..237146d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -112,6 +112,8 @@ public class HiveTableFactoryTest {
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSink tableSink = tableFactory.createTableSink(path, table);
assertTrue(tableSink instanceof HiveTableSink);
+ TableSource tableSource = tableFactory.createTableSource(path, table);
+ assertTrue(tableSource instanceof HiveTableSource);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index 1a8807f..d6cf124 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -110,7 +112,9 @@ public class HiveTableSourceTest {
client.createTable(tbl);
ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
- HiveTableSource hiveTableSource = new HiveTableSource(tableSchema, new JobConf(hiveConf), dbName, tblName, null);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+ HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames()));