You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/09 16:34:07 UTC
[flink] branch master updated: [FLINK-13068][hive] HiveTableSink
should implement PartitionableTableSink
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d5f8078 [FLINK-13068][hive] HiveTableSink should implement PartitionableTableSink
d5f8078 is described below
commit d5f8078a358ed3e1dfa79042873c060e59d4732e
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Jul 3 17:25:34 2019 +0800
[FLINK-13068][hive] HiveTableSink should implement PartitionableTableSink
This PR makes HiveTableSink implement PartitionableTableSink, so that HiveTableSink supports static partitioning.
This closes #8965.
---
.../flink/batch/connectors/hive/HiveTableSink.java | 64 ++++++++++++++++++----
.../connectors/hive/HiveTableOutputFormatTest.java | 33 -----------
.../batch/connectors/hive/HiveTableSinkTest.java | 32 +++++++++++
3 files changed, 84 insertions(+), 45 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index ab077d5..ab84068 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
@@ -49,13 +50,16 @@ import org.apache.thrift.TException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* Table sink to write to Hive tables.
*/
-public class HiveTableSink extends OutputFormatTableSink<Row> {
+public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink {
private final JobConf jobConf;
private final CatalogTable catalogTable;
@@ -63,6 +67,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
private final RowTypeInfo rowTypeInfo;
private final String hiveVersion;
+ private Map<String, String> staticPartitionSpec = Collections.emptyMap();
+
// TODO: need OverwritableTableSink to configure this
private boolean overwrite = false;
@@ -77,10 +83,9 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
@Override
public OutputFormat<Row> getOutputFormat() {
- List<String> partitionColumns = catalogTable.getPartitionKeys();
+ List<String> partitionColumns = getPartitionFieldNames();
boolean isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
- // TODO: need PartitionableTableSink to decide whether it's dynamic partitioning
- boolean isDynamicPartition = isPartitioned;
+ boolean isDynamicPartition = isPartitioned && partitionColumns.size() > staticPartitionSpec.size();
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
@@ -90,25 +95,23 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
String sdLocation = sd.getLocation();
HiveTablePartition hiveTablePartition;
if (isPartitioned) {
- // TODO: validate partition spec
- // TODO: strip quotes in partition values
- LinkedHashMap<String, String> strippedPartSpec = new LinkedHashMap<>();
+ validatePartitionSpec();
if (isDynamicPartition) {
List<String> path = new ArrayList<>(2);
path.add(sd.getLocation());
- if (!strippedPartSpec.isEmpty()) {
- path.add(Warehouse.makePartName(strippedPartSpec, false));
+ if (!staticPartitionSpec.isEmpty()) {
+ path.add(Warehouse.makePartName(staticPartitionSpec, false));
}
sdLocation = String.join(Path.SEPARATOR, path);
} else {
List<Partition> partitions = client.listPartitions(dbName, tableName,
- new ArrayList<>(strippedPartSpec.values()), (short) 1);
+ new ArrayList<>(staticPartitionSpec.values()), (short) 1);
sdLocation = !partitions.isEmpty() ? partitions.get(0).getSd().getLocation() :
- sd.getLocation() + Path.SEPARATOR + Warehouse.makePartName(strippedPartSpec, true);
+ sd.getLocation() + Path.SEPARATOR + Warehouse.makePartName(staticPartitionSpec, true);
}
sd.setLocation(toStagingDir(sdLocation, jobConf));
- hiveTablePartition = new HiveTablePartition(sd, new LinkedHashMap<>(strippedPartSpec));
+ hiveTablePartition = new HiveTablePartition(sd, new LinkedHashMap<>(staticPartitionSpec));
} else {
sd.setLocation(toStagingDir(sdLocation, jobConf));
hiveTablePartition = new HiveTablePartition(sd, null);
@@ -161,4 +164,41 @@ public class HiveTableSink extends OutputFormatTableSink<Row> {
fs.deleteOnExit(path);
return res;
}
+
+ @Override
+ public List<String> getPartitionFieldNames() {
+ return catalogTable.getPartitionKeys();
+ }
+
+ @Override
+ public void setStaticPartition(Map<String, String> partitionSpec) {
+ // make it a LinkedHashMap to maintain partition column order
+ staticPartitionSpec = new LinkedHashMap<>();
+ for (String partitionCol : getPartitionFieldNames()) {
+ if (partitionSpec.containsKey(partitionCol)) {
+ staticPartitionSpec.put(partitionCol, partitionSpec.get(partitionCol));
+ }
+ }
+ }
+
+ private void validatePartitionSpec() {
+ List<String> partitionCols = getPartitionFieldNames();
+ List<String> unknownPartCols = staticPartitionSpec.keySet().stream().filter(k -> !partitionCols.contains(k)).collect(Collectors.toList());
+ Preconditions.checkArgument(
+ unknownPartCols.isEmpty(),
+ "Static partition spec contains unknown partition column: " + unknownPartCols.toString());
+ int numStaticPart = staticPartitionSpec.size();
+ if (numStaticPart < partitionCols.size()) {
+ for (String partitionCol : partitionCols) {
+ if (!staticPartitionSpec.containsKey(partitionCol)) {
+ // this is a dynamic partition, make sure we have seen all static ones
+ Preconditions.checkArgument(numStaticPart == 0,
+ "Dynamic partition cannot appear before static partition");
+ return;
+ } else {
+ numStaticPart--;
+ }
+ }
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index ead2fda..1d9a662 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,13 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.types.Row;
@@ -51,7 +48,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -109,35 +105,6 @@ public class HiveTableOutputFormatTest {
hiveCatalog.dropTable(tablePath, false);
}
- @Test
- public void testInsertIntoStaticPartition() throws Exception {
- String dbName = "default";
- String tblName = "dest";
- createDestTable(dbName, tblName, 1);
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- CatalogBaseTable table = hiveCatalog.getTable(tablePath);
-
- Map<String, Object> partSpec = new HashMap<>();
- partSpec.put("s", "a");
- HiveTableOutputFormat outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, partSpec, false);
- outputFormat.open(0, 1);
- List<Row> toWrite = generateRecords(1);
- writeRecords(toWrite, outputFormat);
- outputFormat.close();
- outputFormat.finalizeGlobal(1);
-
- // make sure new partition is created
- assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
- CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
- partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))));
-
- String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
- verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
-
- hiveCatalog.dropTable(tablePath, false);
- }
-
private void createDestTable(String dbName, String tblName, int numPartCols) throws Exception {
ObjectPath tablePath = new ObjectPath(dbName, tblName);
TableSchema tableSchema = new TableSchema(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 44124d9..e31f92f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -203,6 +203,38 @@ public class HiveTableSinkTest {
hiveCatalog.dropTable(tablePath, false);
}
+ @Test
+ public void testInsertIntoStaticPartition() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+ ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+ BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+ List<Row> toWrite = generateRecords(1);
+ tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+ Map<String, String> partSpec = new HashMap<>();
+ partSpec.put("s", "a");
+
+ CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
+ HiveTableSink hiveTableSink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
+ hiveTableSink.setStaticPartition(partSpec);
+ tableEnv.registerTableSink("destSink", hiveTableSink);
+ tableEnv.sqlQuery("select * from src").insertInto("destSink");
+ execEnv.execute();
+
+ // make sure new partition is created
+ assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
+ CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
+
+ String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
+ verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
private RowTypeInfo createDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);