You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/18 06:30:22 UTC
[flink] branch release-1.11 updated: [FLINK-19121][hive] Avoid
accessing HDFS frequently in HiveBulkWriterFactory
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 7d86498 [FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
7d86498 is described below
commit 7d86498e088aac28e9d7c77499e2118370f5cc96
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Sep 4 15:40:52 2020 +0800
[FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
This closes #13301
---
.../flink/connectors/hive/HiveTableSink.java | 51 ++++++++++++++++++++--
1 file changed, 47 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 2ffb792..2e1281f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -29,8 +29,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -45,7 +47,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
-import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -73,6 +74,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -184,8 +186,7 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
tableSchema.getFieldDataTypes(),
partitionColumns);
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
- TableRollingPolicy rollingPolicy = new TableRollingPolicy(
- true,
+ HiveRollingPolicy rollingPolicy = new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
@@ -235,7 +236,7 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
HiveWriterFactory recordWriterFactory,
StorageDescriptor sd,
TableBucketAssigner assigner,
- TableRollingPolicy rollingPolicy,
+ HiveRollingPolicy rollingPolicy,
OutputFileConfig outputFileConfig) {
HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
return new HadoopPathBasedBulkFormatBuilder<>(
@@ -325,4 +326,46 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
+
+ /**
+ * Getting size of the file is too expensive. See {@link HiveBulkWriterFactory#create}.
+ * We can't check for every element, which will cause great pressure on DFS.
+ * Therefore, in this implementation, only check the file size in
+ * {@link #shouldRollOnProcessingTime}, which can effectively avoid DFS pressure.
+ */
+ private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
+
+ private final long rollingFileSize;
+ private final long rollingTimeInterval;
+
+ private HiveRollingPolicy(
+ long rollingFileSize,
+ long rollingTimeInterval) {
+ Preconditions.checkArgument(rollingFileSize > 0L);
+ Preconditions.checkArgument(rollingTimeInterval > 0L);
+ this.rollingFileSize = rollingFileSize;
+ this.rollingTimeInterval = rollingTimeInterval;
+ }
+
+ @Override
+ public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
+ return true;
+ }
+
+ @Override
+ public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
+ return false;
+ }
+
+ @Override
+ public boolean shouldRollOnProcessingTime(
+ PartFileInfo<String> partFileState, long currentTime) {
+ try {
+ return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||
+ partFileState.getSize() > rollingFileSize;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
}