You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/27 03:05:50 UTC

[flink] 03/03: [FLINK-16647][table-runtime-blink][hive] Miss file extension when inserting to hive table with compression

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6758ee33ef7fd87d53e97459d293fa38700c9085
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Mar 23 11:55:59 2020 +0800

    [FLINK-16647][table-runtime-blink][hive] Miss file extension when inserting to hive table with compression
---
 .../connectors/hive/HiveOutputFormatFactory.java   | 17 ++++++----
 .../flink/connectors/hive/HiveTableSink.java       | 20 ++++++++++--
 .../flink/table/catalog/hive/client/HiveShim.java  |  8 ++++-
 .../table/catalog/hive/client/HiveShimV100.java    | 21 +++++++++----
 .../table/catalog/hive/client/HiveShimV110.java    | 23 ++++++++++----
 .../hive/HiveOutputFormatFactoryTest.java          |  5 +--
 .../connectors/hive/TableEnvHiveConnectorTest.java | 36 ++++++++++++++++++++++
 .../sink/filesystem/OutputFileConfig.java          |  4 +--
 .../table/filesystem/FileSystemOutputFormat.java   | 18 +++++++++--
 .../table/filesystem/PartitionTempFileManager.java | 18 +++++++++--
 10 files changed, 139 insertions(+), 31 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
index 9629566..378d9ef 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
@@ -61,7 +61,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final String outputFormat;
+	private final Class hiveOutputFormatClz;
 
 	private final SerDeInfo serDeInfo;
 
@@ -77,6 +77,8 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
 
 	private final HiveShim hiveShim;
 
+	private final boolean isCompressed;
+
 	// number of non-partitioning columns
 	private transient int numNonPartitionColumns;
 
@@ -93,20 +95,24 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
 
 	public HiveOutputFormatFactory(
 			JobConf jobConf,
-			String outputFormat,
+			Class hiveOutputFormatClz,
 			SerDeInfo serDeInfo,
 			TableSchema schema,
 			String[] partitionColumns,
 			Properties tableProperties,
-			HiveShim hiveShim) {
+			HiveShim hiveShim,
+			boolean isCompressed) {
+		Preconditions.checkArgument(org.apache.hadoop.hive.ql.io.HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
+				"The output format should be an instance of HiveOutputFormat");
 		this.confWrapper = new JobConfWrapper(jobConf);
-		this.outputFormat = outputFormat;
+		this.hiveOutputFormatClz = hiveOutputFormatClz;
 		this.serDeInfo = serDeInfo;
 		this.allColumns = schema.getFieldNames();
 		this.allTypes = schema.getFieldDataTypes();
 		this.partitionColumns = partitionColumns;
 		this.tableProperties = tableProperties;
 		this.hiveShim = hiveShim;
+		this.isCompressed = isCompressed;
 	}
 
 	private void init() throws Exception {
@@ -145,7 +151,6 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
 
 			JobConf conf = new JobConf(confWrapper.conf());
 
-			final boolean isCompressed = conf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
 			if (isCompressed) {
 				String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
 				if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
@@ -164,7 +169,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
 
 			RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
 					conf,
-					outputFormat,
+					hiveOutputFormatClz,
 					recordSerDe.getSerializedClass(),
 					isCompressed,
 					tableProperties,
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index c4f4124..283525a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -47,6 +48,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.thrift.TException;
 
@@ -104,26 +107,39 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 			builder.setDynamicGrouped(dynamicGrouping);
 			builder.setPartitionColumns(partitionColumns);
 			builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
+
+			boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
+			Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
 			builder.setFormatFactory(new HiveOutputFormatFactory(
 					jobConf,
-					sd.getOutputFormat(),
+					hiveOutputFormatClz,
 					sd.getSerdeInfo(),
 					tableSchema,
 					partitionColumns,
 					HiveReflectionUtils.getTableMetadata(hiveShim, table),
-					hiveShim));
+					hiveShim,
+					isCompressed));
 			builder.setMetaStoreFactory(
 					new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, tableName));
 			builder.setOverwrite(overwrite);
 			builder.setStaticPartitions(staticPartitionSpec);
 			builder.setTempPath(new org.apache.flink.core.fs.Path(
 					toStagingDir(sd.getLocation(), jobConf)));
+			String extension = Utilities.getFileExtension(jobConf, isCompressed,
+					(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
+			extension = extension == null ? "" : extension;
+			OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
+			builder.setOutputFileConfig(outputFileConfig);
 
 			return builder.build();
 		} catch (TException e) {
 			throw new CatalogException("Failed to query Hive metaStore", e);
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Failed to create staging dir", e);
+		} catch (ClassNotFoundException e) {
+			throw new FlinkHiveException("Failed to get output format class", e);
+		} catch (IllegalAccessException | InstantiationException e) {
+			throw new FlinkHiveException("Failed to instantiate output format instance", e);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 346a8e1..258c52c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
@@ -140,10 +141,15 @@ public interface HiveShim extends Serializable {
 	/**
 	 * Get Hive's FileSinkOperator.RecordWriter.
 	 */
-	FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+	FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
 			Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath);
 
 	/**
+	 * For a given OutputFormat class, get the corresponding {@link HiveOutputFormat} class.
+	 */
+	Class getHiveOutputFormatClass(Class outputFormatClz);
+
+	/**
 	 * Get Hive table schema from deserializer.
 	 */
 	List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index a6fcd78..dae39d1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -188,16 +188,12 @@ public class HiveShimV100 implements HiveShim {
 	}
 
 	@Override
-	public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+	public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
 			Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
 		try {
-			Class outputFormatClz = Class.forName(outputFormatClzName);
 			Class utilClass = HiveFileFormatUtils.class;
-			Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class, boolean.class);
-			outputFormatClz = (Class) utilMethod.invoke(null, outputFormatClz, false);
-			Preconditions.checkState(outputFormatClz != null, "No Hive substitute output format for " + outputFormatClzName);
 			HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();
-			utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, HiveOutputFormat.class,
+			Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, HiveOutputFormat.class,
 					Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
 			return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
 					jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
@@ -207,6 +203,19 @@ public class HiveShimV100 implements HiveShim {
 	}
 
 	@Override
+	public Class getHiveOutputFormatClass(Class outputFormatClz) {
+		try {
+			Class utilClass = HiveFileFormatUtils.class;
+			Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class, boolean.class);
+			Class res = (Class) utilMethod.invoke(null, outputFormatClz, false);
+			Preconditions.checkState(res != null, "No Hive substitute output format for " + outputFormatClz);
+			return res;
+		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new FlinkHiveException("Failed to get HiveOutputFormat for " + outputFormatClz, e);
+		}
+	}
+
+	@Override
 	public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
 		try {
 			Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class, Table.class);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
index 691c14a..8d1885d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.client;
 
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.util.Preconditions;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.Reporter;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
@@ -43,16 +45,12 @@ import java.util.Properties;
 public class HiveShimV110 extends HiveShimV101 {
 
 	@Override
-	public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+	public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
 			Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
 		try {
-			Class outputFormatClz = Class.forName(outputFormatClzName);
 			Class utilClass = HiveFileFormatUtils.class;
-			Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
-			outputFormatClz = (Class) utilMethod.invoke(null, outputFormatClz);
-			Preconditions.checkState(outputFormatClz != null, "No Hive substitute output format for " + outputFormatClzName);
 			OutputFormat outputFormat = (OutputFormat) outputFormatClz.newInstance();
-			utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, OutputFormat.class,
+			Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, OutputFormat.class,
 					Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
 			return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
 					jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
@@ -62,6 +60,19 @@ public class HiveShimV110 extends HiveShimV101 {
 	}
 
 	@Override
+	public Class getHiveOutputFormatClass(Class outputFormatClz) {
+		try {
+			Class utilClass = HiveFileFormatUtils.class;
+			Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
+			Class res = (Class) utilMethod.invoke(null, outputFormatClz);
+			Preconditions.checkState(res != null, "No Hive substitute output format for " + outputFormatClz);
+			return res;
+		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new FlinkHiveException("Failed to get HiveOutputFormat for " + outputFormatClz, e);
+		}
+	}
+
+	@Override
 	public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
 		try {
 			Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
index 70152dc..fe70b98 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
@@ -54,11 +54,12 @@ public class HiveOutputFormatFactoryTest {
 		SerDeInfo serDeInfo = new SerDeInfo("name", LazySimpleSerDe.class.getName(), Collections.emptyMap());
 		HiveOutputFormatFactory factory = new HiveOutputFormatFactory(
 				new JobConf(),
-				VerifyURIOutputFormat.class.getName(),
+				VerifyURIOutputFormat.class,
 				serDeInfo, schema,
 				new String[0],
 				new Properties(),
-				HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()));
+				HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
+				false);
 		org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TEST_URI_SCHEME, TEST_URI_AUTHORITY, "/foo/path");
 		factory.createOutputFormat(path);
 	}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 1da85f8..dc5360e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -511,6 +512,28 @@ public class TableEnvHiveConnectorTest {
 		}
 	}
 
+	@Test
+	public void testCompressTextTable() throws Exception {
+		hiveShell.execute("create database db1");
+		try {
+			hiveShell.execute("create table db1.src (x string,y string)");
+			hiveShell.execute("create table db1.dest like db1.src");
+			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
+					.addRow(new Object[]{"a", "b"})
+					.addRow(new Object[]{"c", "d"})
+					.commit();
+			hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
+			TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.sqlUpdate("insert overwrite db1.dest select * from db1.src");
+			tableEnv.execute("insert dest");
+			List<String> expected = Arrays.asList("a\tb", "c\td");
+			verifyHiveQueryResult("select * from db1.dest", expected);
+			verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.dest"), expected);
+		} finally {
+			hiveShell.execute("drop database db1 cascade");
+		}
+	}
+
 	private TableEnvironment getTableEnvWithHiveCatalog() {
 		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
@@ -523,4 +546,17 @@ public class TableEnvHiveConnectorTest {
 		assertEquals(expected.size(), results.size());
 		assertEquals(new HashSet<>(expected), new HashSet<>(results));
 	}
+
+	private void verifyFlinkQueryResult(org.apache.flink.table.api.Table table, List<String> expected) throws Exception {
+		List<Row> rows = TableUtils.collectToList(table);
+		List<String> results = rows.stream().map(row ->
+				IntStream.range(0, row.getArity())
+						.mapToObj(row::getField)
+						.map(o -> o instanceof LocalDateTime ?
+								Timestamp.valueOf((LocalDateTime) o) : o)
+						.map(Object::toString)
+						.collect(Collectors.joining("\t"))).collect(Collectors.toList());
+		assertEquals(expected.size(), results.size());
+		assertEquals(new HashSet<>(expected), new HashSet<>(results));
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
index 98b84d1..e139b17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
@@ -47,14 +47,14 @@ public class OutputFileConfig implements Serializable {
 	/**
 	 * The prefix for the part name.
 	 */
-	String getPartPrefix() {
+	public String getPartPrefix() {
 		return partPrefix;
 	}
 
 	/**
 	 * The suffix for the part name.
 	 */
-	String getPartSuffix() {
+	public String getPartSuffix() {
 		return partSuffix;
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
index 1854967..56011ab 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.table.api.TableException;
 
 import java.io.IOException;
@@ -53,6 +54,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 	private final LinkedHashMap<String, String> staticPartitions;
 	private final PartitionComputer<T> computer;
 	private final OutputFormatFactory<T> formatFactory;
+	private final OutputFileConfig outputFileConfig;
 
 	private transient PartitionWriter<T> writer;
 	private transient Configuration parameters;
@@ -66,7 +68,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 			boolean dynamicGrouped,
 			LinkedHashMap<String, String> staticPartitions,
 			OutputFormatFactory<T> formatFactory,
-			PartitionComputer<T> computer) {
+			PartitionComputer<T> computer,
+			OutputFileConfig outputFileConfig) {
 		this.fsFactory = fsFactory;
 		this.msFactory = msFactory;
 		this.overwrite = overwrite;
@@ -76,6 +79,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 		this.staticPartitions = staticPartitions;
 		this.formatFactory = formatFactory;
 		this.computer = computer;
+		this.outputFileConfig = outputFileConfig;
 	}
 
 	@Override
@@ -102,7 +106,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 	public void open(int taskNumber, int numTasks) throws IOException {
 		try {
 			PartitionTempFileManager fileManager = new PartitionTempFileManager(
-					fsFactory, tmpPath, taskNumber, CHECKPOINT_ID);
+					fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
 			PartitionWriter.Context<T> context = new PartitionWriter.Context<>(
 					parameters, formatFactory);
 			writer = PartitionWriterFactory.<T>get(
@@ -149,6 +153,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 
 		private PartitionComputer<T> computer;
 
+		private OutputFileConfig outputFileConfig = new OutputFileConfig("", "");
+
 		public Builder<T> setPartitionColumns(String[] partitionColumns) {
 			this.partitionColumns = partitionColumns;
 			return this;
@@ -194,6 +200,11 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 			return this;
 		}
 
+		public Builder<T> setOutputFileConfig(OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return this;
+		}
+
 		public FileSystemOutputFormat<T> build() {
 			checkNotNull(partitionColumns, "partitionColumns should not be null");
 			checkNotNull(formatFactory, "formatFactory should not be null");
@@ -210,7 +221,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
 					dynamicGrouped,
 					staticPartitions,
 					formatFactory,
-					computer);
+					computer,
+					outputFileConfig);
 		}
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
index 77534e9..26e9c81 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,6 +55,7 @@ public class PartitionTempFileManager {
 	private final int taskNumber;
 	private final long checkpointId;
 	private final Path taskTmpDir;
+	private final OutputFileConfig outputFileConfig;
 
 	private transient int nameCounter = 0;
 
@@ -62,9 +64,19 @@ public class PartitionTempFileManager {
 			Path tmpPath,
 			int taskNumber,
 			long checkpointId) throws IOException {
+		this(factory, tmpPath, taskNumber, checkpointId, new OutputFileConfig("", ""));
+	}
+
+	PartitionTempFileManager(
+			FileSystemFactory factory,
+			Path tmpPath,
+			int taskNumber,
+			long checkpointId,
+			OutputFileConfig outputFileConfig) throws IOException {
 		checkArgument(checkpointId != -1, "checkpoint id start with 0.");
 		this.taskNumber = taskNumber;
 		this.checkpointId = checkpointId;
+		this.outputFileConfig = outputFileConfig;
 
 		// generate and clean task temp dir.
 		this.taskTmpDir = new Path(
@@ -85,9 +97,9 @@ public class PartitionTempFileManager {
 	}
 
 	private String newFileName() {
-		return String.format(
-				checkpointName(checkpointId) + "-" + taskName(taskNumber) + "-file-%d",
-				nameCounter++);
+		return String.format("%s%s-%s-file-%d%s",
+				outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
+				taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
 	}
 
 	private static boolean isTaskDir(String fileName) {