You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/17 07:51:29 UTC
[flink] branch release-1.11 updated: [FLINK-18942][hive]
HiveTableSink shouldn't try to create BulkWriter factory when using MR
writer
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a1d8f6a [FLINK-18942][hive] HiveTableSink shouldn't try to create BulkWriter factory when using MR writer
a1d8f6a is described below
commit a1d8f6a04623a04eec04f71c47a21b0281565dc8
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Aug 17 15:50:32 2020 +0800
[FLINK-18942][hive] HiveTableSink shouldn't try to create BulkWriter factory when using MR writer
This closes #13146
---
.../flink/connectors/hive/HiveTableSink.java | 42 ++++++++++++++--------
.../flink/connectors/hive/HiveTableSinkITCase.java | 6 ++--
2 files changed, 32 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 03f21df..2ffb792 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -189,23 +189,24 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
- Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
- if (userMrWriter || !bulkFactory.isPresent()) {
- HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
- builder = new HadoopPathBasedBulkFormatBuilder<>(
- new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
- .withRollingPolicy(rollingPolicy)
- .withOutputFileConfig(outputFileConfig);
+ if (userMrWriter) {
+ builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
- builder = StreamingFileSink.forBulkFormat(
- new org.apache.flink.core.fs.Path(sd.getLocation()),
- new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
- .withBucketAssigner(assigner)
- .withRollingPolicy(rollingPolicy)
- .withOutputFileConfig(outputFileConfig);
- LOG.info("Hive streaming sink: Use native parquet&orc writer.");
+ Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
+ if (bulkFactory.isPresent()) {
+ builder = StreamingFileSink.forBulkFormat(
+ new org.apache.flink.core.fs.Path(sd.getLocation()),
+ new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
+ .withBucketAssigner(assigner)
+ .withRollingPolicy(rollingPolicy)
+ .withOutputFileConfig(outputFileConfig);
+ LOG.info("Hive streaming sink: Use native parquet&orc writer.");
+ } else {
+ builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
+ LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
+ }
}
return FileSystemTableSink.createStreamingSink(
conf,
@@ -230,6 +231,19 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
}
}
+ private BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(
+ HiveWriterFactory recordWriterFactory,
+ StorageDescriptor sd,
+ TableBucketAssigner assigner,
+ TableRollingPolicy rollingPolicy,
+ OutputFileConfig outputFileConfig) {
+ HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
+ return new HadoopPathBasedBulkFormatBuilder<>(
+ new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
+ .withRollingPolicy(rollingPolicy)
+ .withOutputFileConfig(outputFileConfig);
+ }
+
private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] partitionColumns,
StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index b9819ad..709ea1b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -276,7 +276,8 @@ public class HiveTableSinkITCase {
@Test(timeout = 120000)
public void testPartStreamingMrWrite() throws Exception {
testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
- if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+ // doesn't support writer 2.0 orc table
+ if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
}
}
@@ -284,7 +285,8 @@ public class HiveTableSinkITCase {
@Test(timeout = 120000)
public void testNonPartStreamingMrWrite() throws Exception {
testStreamingWrite(false, true, "parquet", (p) -> {});
- if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+ // doesn't support writer 2.0 orc table
+ if (!hiveCatalog.getHiveVersion().startsWith("2.0")) {
testStreamingWrite(false, true, "orc", (p) -> {});
}
}