You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/17 07:51:29 UTC

[flink] branch release-1.11 updated: [FLINK-18942][hive] HiveTableSink shouldn't try to create BulkWriter factory when using MR writer

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a1d8f6a  [FLINK-18942][hive] HiveTableSink shouldn't try to create BulkWriter factory when using MR writer
a1d8f6a is described below

commit a1d8f6a04623a04eec04f71c47a21b0281565dc8
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Aug 17 15:50:32 2020 +0800

    [FLINK-18942][hive] HiveTableSink shouldn't try to create BulkWriter factory when using MR writer
    
    This closes #13146
---
 .../flink/connectors/hive/HiveTableSink.java       | 42 ++++++++++++++--------
 .../flink/connectors/hive/HiveTableSinkITCase.java |  6 ++--
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 03f21df..2ffb792 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -189,23 +189,24 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
 						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
 
-				Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
 				BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
-				if (userMrWriter || !bulkFactory.isPresent()) {
-					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
-					builder = new HadoopPathBasedBulkFormatBuilder<>(
-							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
-							.withRollingPolicy(rollingPolicy)
-							.withOutputFileConfig(outputFileConfig);
+				if (userMrWriter) {
+					builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
 					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
 				} else {
-					builder = StreamingFileSink.forBulkFormat(
-							new org.apache.flink.core.fs.Path(sd.getLocation()),
-							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
-							.withBucketAssigner(assigner)
-							.withRollingPolicy(rollingPolicy)
-							.withOutputFileConfig(outputFileConfig);
-					LOG.info("Hive streaming sink: Use native parquet&orc writer.");
+					Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
+					if (bulkFactory.isPresent()) {
+						builder = StreamingFileSink.forBulkFormat(
+								new org.apache.flink.core.fs.Path(sd.getLocation()),
+								new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
+								.withBucketAssigner(assigner)
+								.withRollingPolicy(rollingPolicy)
+								.withOutputFileConfig(outputFileConfig);
+						LOG.info("Hive streaming sink: Use native parquet&orc writer.");
+					} else {
+						builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
+						LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
+					}
 				}
 				return FileSystemTableSink.createStreamingSink(
 						conf,
@@ -230,6 +231,19 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 		}
 	}
 
+	private BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(
+			HiveWriterFactory recordWriterFactory,
+			StorageDescriptor sd,
+			TableBucketAssigner assigner,
+			TableRollingPolicy rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
+		return new HadoopPathBasedBulkFormatBuilder<>(
+				new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
+				.withRollingPolicy(rollingPolicy)
+				.withOutputFileConfig(outputFileConfig);
+	}
+
 	private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] partitionColumns,
 			StorageDescriptor sd) {
 		String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index b9819ad..709ea1b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -276,7 +276,8 @@ public class HiveTableSinkITCase {
 	@Test(timeout = 120000)
 	public void testPartStreamingMrWrite() throws Exception {
 		testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
-		if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+		// doesn't support writer 2.0 orc table
+		if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
 			testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
 		}
 	}
@@ -284,7 +285,8 @@ public class HiveTableSinkITCase {
 	@Test(timeout = 120000)
 	public void testNonPartStreamingMrWrite() throws Exception {
 		testStreamingWrite(false, true, "parquet", (p) -> {});
-		if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+		// doesn't support writer 2.0 orc table
+		if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
 			testStreamingWrite(false, true, "orc", (p) -> {});
 		}
 	}