You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/09/18 06:30:22 UTC

[flink] branch release-1.11 updated: [FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 7d86498  [FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
7d86498 is described below

commit 7d86498e088aac28e9d7c77499e2118370f5cc96
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Sep 4 15:40:52 2020 +0800

    [FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory
    
    This closes #13301
---
 .../flink/connectors/hive/HiveTableSink.java       | 51 ++++++++++++++++++++--
 1 file changed, 47 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 2ffb792..2e1281f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -29,8 +29,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
 import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -45,7 +47,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.filesystem.FileSystemOutputFormat;
 import org.apache.flink.table.filesystem.FileSystemTableSink;
 import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
-import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -73,6 +74,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -184,8 +186,7 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 						tableSchema.getFieldDataTypes(),
 						partitionColumns);
 				TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
-				TableRollingPolicy rollingPolicy = new TableRollingPolicy(
-						true,
+				HiveRollingPolicy rollingPolicy = new HiveRollingPolicy(
 						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
 						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
 
@@ -235,7 +236,7 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 			HiveWriterFactory recordWriterFactory,
 			StorageDescriptor sd,
 			TableBucketAssigner assigner,
-			TableRollingPolicy rollingPolicy,
+			HiveRollingPolicy rollingPolicy,
 			OutputFileConfig outputFileConfig) {
 		HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
 		return new HadoopPathBasedBulkFormatBuilder<>(
@@ -325,4 +326,46 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 	public void setOverwrite(boolean overwrite) {
 		this.overwrite = overwrite;
 	}
+
+	/**
+	 * Getting size of the file is too expensive. See {@link HiveBulkWriterFactory#create}.
+	 * We can't check for every element, which will cause great pressure on DFS.
+	 * Therefore, in this implementation, only check the file size in
+	 * {@link #shouldRollOnProcessingTime}, which can effectively avoid DFS pressure.
+	 */
+	private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
+
+		private final long rollingFileSize;
+		private final long rollingTimeInterval;
+
+		private HiveRollingPolicy(
+				long rollingFileSize,
+				long rollingTimeInterval) {
+			Preconditions.checkArgument(rollingFileSize > 0L);
+			Preconditions.checkArgument(rollingTimeInterval > 0L);
+			this.rollingFileSize = rollingFileSize;
+			this.rollingTimeInterval = rollingTimeInterval;
+		}
+
+		@Override
+		public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
+			return true;
+		}
+
+		@Override
+		public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
+			return false;
+		}
+
+		@Override
+		public boolean shouldRollOnProcessingTime(
+				PartFileInfo<String> partFileState, long currentTime) {
+			try {
+				return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||
+						partFileState.getSize() > rollingFileSize;
+			} catch (IOException e) {
+				throw new UncheckedIOException(e);
+			}
+		}
+	}
 }