You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/09 16:34:07 UTC

[flink] branch master updated: [FLINK-13068][hive] HiveTableSink should implement PartitionableTableSink

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5f8078  [FLINK-13068][hive] HiveTableSink should implement PartitionableTableSink
d5f8078 is described below

commit d5f8078a358ed3e1dfa79042873c060e59d4732e
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Jul 3 17:25:34 2019 +0800

    [FLINK-13068][hive] HiveTableSink should implement PartitionableTableSink
    
    This PR makes HiveTableSink implement PartitionableTableSink, so that HiveTableSink supports static partitioning.
    
    This closes #8965.
---
 .../flink/batch/connectors/hive/HiveTableSink.java | 64 ++++++++++++++++++----
 .../connectors/hive/HiveTableOutputFormatTest.java | 33 -----------
 .../batch/connectors/hive/HiveTableSinkTest.java   | 32 +++++++++++
 3 files changed, 84 insertions(+), 45 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index ab077d5..ab84068 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.sinks.PartitionableTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -49,13 +50,16 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Table sink to write to Hive tables.
  */
-public class HiveTableSink extends OutputFormatTableSink<Row> {
+public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink {
 
 	private final JobConf jobConf;
 	private final CatalogTable catalogTable;
@@ -63,6 +67,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
 	private final RowTypeInfo rowTypeInfo;
 	private final String hiveVersion;
 
+	private Map<String, String> staticPartitionSpec = Collections.emptyMap();
+
 	// TODO: need OverwritableTableSink to configure this
 	private boolean overwrite = false;
 
@@ -77,10 +83,9 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
 
 	@Override
 	public OutputFormat<Row> getOutputFormat() {
-		List<String> partitionColumns = catalogTable.getPartitionKeys();
+		List<String> partitionColumns = getPartitionFieldNames();
 		boolean isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
-		// TODO: need PartitionableTableSink to decide whether it's dynamic partitioning
-		boolean isDynamicPartition = isPartitioned;
+		boolean isDynamicPartition = isPartitioned && partitionColumns.size() > staticPartitionSpec.size();
 		String dbName = tablePath.getDatabaseName();
 		String tableName = tablePath.getObjectName();
 		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
@@ -90,25 +95,23 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
 			String sdLocation = sd.getLocation();
 			HiveTablePartition hiveTablePartition;
 			if (isPartitioned) {
-				// TODO: validate partition spec
-				// TODO: strip quotes in partition values
-				LinkedHashMap<String, String> strippedPartSpec = new LinkedHashMap<>();
+				validatePartitionSpec();
 				if (isDynamicPartition) {
 					List<String> path = new ArrayList<>(2);
 					path.add(sd.getLocation());
-					if (!strippedPartSpec.isEmpty()) {
-						path.add(Warehouse.makePartName(strippedPartSpec, false));
+					if (!staticPartitionSpec.isEmpty()) {
+						path.add(Warehouse.makePartName(staticPartitionSpec, false));
 					}
 					sdLocation = String.join(Path.SEPARATOR, path);
 				} else {
 					List<Partition> partitions = client.listPartitions(dbName, tableName,
-							new ArrayList<>(strippedPartSpec.values()), (short) 1);
+							new ArrayList<>(staticPartitionSpec.values()), (short) 1);
 					sdLocation = !partitions.isEmpty() ? partitions.get(0).getSd().getLocation() :
-							sd.getLocation() + Path.SEPARATOR + Warehouse.makePartName(strippedPartSpec, true);
+							sd.getLocation() + Path.SEPARATOR + Warehouse.makePartName(staticPartitionSpec, true);
 				}
 
 				sd.setLocation(toStagingDir(sdLocation, jobConf));
-				hiveTablePartition = new HiveTablePartition(sd, new LinkedHashMap<>(strippedPartSpec));
+				hiveTablePartition = new HiveTablePartition(sd, new LinkedHashMap<>(staticPartitionSpec));
 			} else {
 				sd.setLocation(toStagingDir(sdLocation, jobConf));
 				hiveTablePartition = new HiveTablePartition(sd, null);
@@ -161,4 +164,41 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
 		fs.deleteOnExit(path);
 		return res;
 	}
+
+	@Override
+	public List<String> getPartitionFieldNames() {
+		return catalogTable.getPartitionKeys();
+	}
+
+	@Override
+	public void setStaticPartition(Map<String, String> partitionSpec) {
+		// make it a LinkedHashMap to maintain partition column order
+		staticPartitionSpec = new LinkedHashMap<>();
+		for (String partitionCol : getPartitionFieldNames()) {
+			if (partitionSpec.containsKey(partitionCol)) {
+				staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
+			}
+		}
+	}
+
+	private void validatePartitionSpec() {
+		List<String> partitionCols = getPartitionFieldNames();
+		List<String> unknownPartCols = staticPartitionSpec.keySet().stream().filter(k -> !partitionCols.contains(k)).collect(Collectors.toList());
+		Preconditions.checkArgument(
+				unknownPartCols.isEmpty(),
+				"Static partition spec contains unknown partition column: " + unknownPartCols.toString());
+		int numStaticPart = staticPartitionSpec.size();
+		if (numStaticPart < partitionCols.size()) {
+			for (String partitionCol : partitionCols) {
+				if (!staticPartitionSpec.containsKey(partitionCol)) {
+					// this is a dynamic partition, make sure we have seen all static ones
+					Preconditions.checkArgument(numStaticPart == 0,
+							"Dynamic partition cannot appear before static partition");
+					return;
+				} else {
+					numStaticPart--;
+				}
+			}
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index ead2fda..1d9a662 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,13 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.types.Row;
 
@@ -51,7 +48,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -109,35 +105,6 @@ public class HiveTableOutputFormatTest {
 		hiveCatalog.dropTable(tablePath, false);
 	}
 
-	@Test
-	public void testInsertIntoStaticPartition() throws Exception {
-		String dbName = "default";
-		String tblName = "dest";
-		createDestTable(dbName, tblName, 1);
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-		CatalogBaseTable table = hiveCatalog.getTable(tablePath);
-
-		Map<String, Object> partSpec = new HashMap<>();
-		partSpec.put("s", "a");
-		HiveTableOutputFormat outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, partSpec, false);
-		outputFormat.open(0, 1);
-		List<Row> toWrite = generateRecords(1);
-		writeRecords(toWrite, outputFormat);
-		outputFormat.close();
-		outputFormat.finalizeGlobal(1);
-
-		// make sure new partition is created
-		assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
-		CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
-				partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))));
-
-		String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
-		verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
-
-		hiveCatalog.dropTable(tablePath, false);
-	}
-
 	private void createDestTable(String dbName, String tblName, int numPartCols) throws Exception {
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
 		TableSchema tableSchema = new TableSchema(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 44124d9..e31f92f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -203,6 +203,38 @@ public class HiveTableSinkTest {
 		hiveCatalog.dropTable(tablePath, false);
 	}
 
+	@Test
+	public void testInsertIntoStaticPartition() throws Exception {
+		String dbName = "default";
+		String tblName = "dest";
+		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+		List<Row> toWrite = generateRecords(1);
+		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+		Map<String, String> partSpec = new HashMap<>();
+		partSpec.put("s", "a");
+
+		CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
+		HiveTableSink hiveTableSink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
+		hiveTableSink.setStaticPartition(partSpec);
+		tableEnv.registerTableSink("destSink", hiveTableSink);
+		tableEnv.sqlQuery("select * from src").insertInto("destSink");
+		execEnv.execute();
+
+		// make sure new partition is created
+		assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
+		CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
+
+		String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
+		verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+
+		hiveCatalog.dropTable(tablePath, false);
+	}
+
 	private RowTypeInfo createDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
 		CatalogTable catalogTable = createCatalogTable(tableSchema, numPartCols);
 		hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);