You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/27 03:05:50 UTC
[flink] 03/03: [FLINK-16647][table-runtime-blink][hive] Miss file
extension when inserting to hive table with compression
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6758ee33ef7fd87d53e97459d293fa38700c9085
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Mar 23 11:55:59 2020 +0800
[FLINK-16647][table-runtime-blink][hive] Miss file extension when inserting to hive table with compression
---
.../connectors/hive/HiveOutputFormatFactory.java | 17 ++++++----
.../flink/connectors/hive/HiveTableSink.java | 20 ++++++++++--
.../flink/table/catalog/hive/client/HiveShim.java | 8 ++++-
.../table/catalog/hive/client/HiveShimV100.java | 21 +++++++++----
.../table/catalog/hive/client/HiveShimV110.java | 23 ++++++++++----
.../hive/HiveOutputFormatFactoryTest.java | 5 +--
.../connectors/hive/TableEnvHiveConnectorTest.java | 36 ++++++++++++++++++++++
.../sink/filesystem/OutputFileConfig.java | 4 +--
.../table/filesystem/FileSystemOutputFormat.java | 18 +++++++++--
.../table/filesystem/PartitionTempFileManager.java | 18 +++++++++--
10 files changed, 139 insertions(+), 31 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
index 9629566..378d9ef 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOutputFormatFactory.java
@@ -61,7 +61,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private static final long serialVersionUID = 1L;
- private final String outputFormat;
+ private final Class hiveOutputFormatClz;
private final SerDeInfo serDeInfo;
@@ -77,6 +77,8 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
private final HiveShim hiveShim;
+ private final boolean isCompressed;
+
// number of non-partitioning columns
private transient int numNonPartitionColumns;
@@ -93,20 +95,24 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
public HiveOutputFormatFactory(
JobConf jobConf,
- String outputFormat,
+ Class hiveOutputFormatClz,
SerDeInfo serDeInfo,
TableSchema schema,
String[] partitionColumns,
Properties tableProperties,
- HiveShim hiveShim) {
+ HiveShim hiveShim,
+ boolean isCompressed) {
+ Preconditions.checkArgument(org.apache.hadoop.hive.ql.io.HiveOutputFormat.class.isAssignableFrom(hiveOutputFormatClz),
+ "The output format should be an instance of HiveOutputFormat");
this.confWrapper = new JobConfWrapper(jobConf);
- this.outputFormat = outputFormat;
+ this.hiveOutputFormatClz = hiveOutputFormatClz;
this.serDeInfo = serDeInfo;
this.allColumns = schema.getFieldNames();
this.allTypes = schema.getFieldDataTypes();
this.partitionColumns = partitionColumns;
this.tableProperties = tableProperties;
this.hiveShim = hiveShim;
+ this.isCompressed = isCompressed;
}
private void init() throws Exception {
@@ -145,7 +151,6 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
JobConf conf = new JobConf(confWrapper.conf());
- final boolean isCompressed = conf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
if (isCompressed) {
String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
@@ -164,7 +169,7 @@ public class HiveOutputFormatFactory implements OutputFormatFactory<Row> {
RecordWriter recordWriter = hiveShim.getHiveRecordWriter(
conf,
- outputFormat,
+ hiveOutputFormatClz,
recordSerDe.getSerializedClass(),
isCompressed,
tableProperties,
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index c4f4124..283525a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -47,6 +48,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
@@ -104,26 +107,39 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
builder.setFileSystemFactory(new HadoopFileSystemFactory(jobConf));
+
+ boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
+ Class hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
builder.setFormatFactory(new HiveOutputFormatFactory(
jobConf,
- sd.getOutputFormat(),
+ hiveOutputFormatClz,
sd.getSerdeInfo(),
tableSchema,
partitionColumns,
HiveReflectionUtils.getTableMetadata(hiveShim, table),
- hiveShim));
+ hiveShim,
+ isCompressed));
builder.setMetaStoreFactory(
new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, tableName));
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(new org.apache.flink.core.fs.Path(
toStagingDir(sd.getLocation(), jobConf)));
+ String extension = Utilities.getFileExtension(jobConf, isCompressed,
+ (HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
+ extension = extension == null ? "" : extension;
+ OutputFileConfig outputFileConfig = new OutputFileConfig("", extension);
+ builder.setOutputFileConfig(outputFileConfig);
return builder.build();
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
+ } catch (ClassNotFoundException e) {
+ throw new FlinkHiveException("Failed to get output format class", e);
+ } catch (IllegalAccessException | InstantiationException e) {
+ throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 346a8e1..258c52c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
@@ -140,10 +141,15 @@ public interface HiveShim extends Serializable {
/**
* Get Hive's FileSinkOperator.RecordWriter.
*/
- FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+ FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath);
/**
+ * For a given OutputFormat class, get the corresponding {@link HiveOutputFormat} class.
+ */
+ Class getHiveOutputFormatClass(Class outputFormatClz);
+
+ /**
* Get Hive table schema from deserializer.
*/
List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index a6fcd78..dae39d1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -188,16 +188,12 @@ public class HiveShimV100 implements HiveShim {
}
@Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
try {
- Class outputFormatClz = Class.forName(outputFormatClzName);
Class utilClass = HiveFileFormatUtils.class;
- Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class, boolean.class);
- outputFormatClz = (Class) utilMethod.invoke(null, outputFormatClz, false);
- Preconditions.checkState(outputFormatClz != null, "No Hive substitute output format for " + outputFormatClzName);
HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();
- utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, HiveOutputFormat.class,
+ Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, HiveOutputFormat.class,
Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
@@ -207,6 +203,19 @@ public class HiveShimV100 implements HiveShim {
}
@Override
+ public Class getHiveOutputFormatClass(Class outputFormatClz) {
+ try {
+ Class utilClass = HiveFileFormatUtils.class;
+ Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class, boolean.class);
+ Class res = (Class) utilMethod.invoke(null, outputFormatClz, false);
+ Preconditions.checkState(res != null, "No Hive substitute output format for " + outputFormatClz);
+ return res;
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new FlinkHiveException("Failed to get HiveOutputFormat for " + outputFormatClz, e);
+ }
+ }
+
+ @Override
public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
try {
Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class, Table.class);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
index 691c14a..8d1885d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.catalog.hive.client;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.util.Preconditions;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reporter;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
@@ -43,16 +45,12 @@ import java.util.Properties;
public class HiveShimV110 extends HiveShimV101 {
@Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, String outputFormatClzName,
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
try {
- Class outputFormatClz = Class.forName(outputFormatClzName);
Class utilClass = HiveFileFormatUtils.class;
- Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
- outputFormatClz = (Class) utilMethod.invoke(null, outputFormatClz);
- Preconditions.checkState(outputFormatClz != null, "No Hive substitute output format for " + outputFormatClzName);
OutputFormat outputFormat = (OutputFormat) outputFormatClz.newInstance();
- utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, OutputFormat.class,
+ Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, OutputFormat.class,
Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
@@ -62,6 +60,19 @@ public class HiveShimV110 extends HiveShimV101 {
}
@Override
+ public Class getHiveOutputFormatClass(Class outputFormatClz) {
+ try {
+ Class utilClass = HiveFileFormatUtils.class;
+ Method utilMethod = utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
+ Class res = (Class) utilMethod.invoke(null, outputFormatClz);
+ Preconditions.checkState(res != null, "No Hive substitute output format for " + outputFormatClz);
+ return res;
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new FlinkHiveException("Failed to get HiveOutputFormat for " + outputFormatClz, e);
+ }
+ }
+
+ @Override
public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table table, boolean skipConfError) {
try {
Method utilMethod = getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
index 70152dc..fe70b98 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveOutputFormatFactoryTest.java
@@ -54,11 +54,12 @@ public class HiveOutputFormatFactoryTest {
SerDeInfo serDeInfo = new SerDeInfo("name", LazySimpleSerDe.class.getName(), Collections.emptyMap());
HiveOutputFormatFactory factory = new HiveOutputFormatFactory(
new JobConf(),
- VerifyURIOutputFormat.class.getName(),
+ VerifyURIOutputFormat.class,
serDeInfo, schema,
new String[0],
new Properties(),
- HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()));
+ HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()),
+ false);
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TEST_URI_SCHEME, TEST_URI_AUTHORITY, "/foo/path");
factory.createOutputFormat(path);
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 1da85f8..dc5360e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -511,6 +512,28 @@ public class TableEnvHiveConnectorTest {
}
}
+ @Test
+ public void testCompressTextTable() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src (x string,y string)");
+ hiveShell.execute("create table db1.dest like db1.src");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
+ .addRow(new Object[]{"a", "b"})
+ .addRow(new Object[]{"c", "d"})
+ .commit();
+ hiveCatalog.getHiveConf().setBoolVar(HiveConf.ConfVars.COMPRESSRESULT, true);
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert overwrite db1.dest select * from db1.src");
+ tableEnv.execute("insert dest");
+ List<String> expected = Arrays.asList("a\tb", "c\td");
+ verifyHiveQueryResult("select * from db1.dest", expected);
+ verifyFlinkQueryResult(tableEnv.sqlQuery("select * from db1.dest"), expected);
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
@@ -523,4 +546,17 @@ public class TableEnvHiveConnectorTest {
assertEquals(expected.size(), results.size());
assertEquals(new HashSet<>(expected), new HashSet<>(results));
}
+
+ private void verifyFlinkQueryResult(org.apache.flink.table.api.Table table, List<String> expected) throws Exception {
+ List<Row> rows = TableUtils.collectToList(table);
+ List<String> results = rows.stream().map(row ->
+ IntStream.range(0, row.getArity())
+ .mapToObj(row::getField)
+ .map(o -> o instanceof LocalDateTime ?
+ Timestamp.valueOf((LocalDateTime) o) : o)
+ .map(Object::toString)
+ .collect(Collectors.joining("\t"))).collect(Collectors.toList());
+ assertEquals(expected.size(), results.size());
+ assertEquals(new HashSet<>(expected), new HashSet<>(results));
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
index 98b84d1..e139b17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
@@ -47,14 +47,14 @@ public class OutputFileConfig implements Serializable {
/**
* The prefix for the part name.
*/
- String getPartPrefix() {
+ public String getPartPrefix() {
return partPrefix;
}
/**
* The suffix for the part name.
*/
- String getPartSuffix() {
+ public String getPartSuffix() {
return partSuffix;
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
index 1854967..56011ab 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOutputFormat.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableException;
import java.io.IOException;
@@ -53,6 +54,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
private final LinkedHashMap<String, String> staticPartitions;
private final PartitionComputer<T> computer;
private final OutputFormatFactory<T> formatFactory;
+ private final OutputFileConfig outputFileConfig;
private transient PartitionWriter<T> writer;
private transient Configuration parameters;
@@ -66,7 +68,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
boolean dynamicGrouped,
LinkedHashMap<String, String> staticPartitions,
OutputFormatFactory<T> formatFactory,
- PartitionComputer<T> computer) {
+ PartitionComputer<T> computer,
+ OutputFileConfig outputFileConfig) {
this.fsFactory = fsFactory;
this.msFactory = msFactory;
this.overwrite = overwrite;
@@ -76,6 +79,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
this.staticPartitions = staticPartitions;
this.formatFactory = formatFactory;
this.computer = computer;
+ this.outputFileConfig = outputFileConfig;
}
@Override
@@ -102,7 +106,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
public void open(int taskNumber, int numTasks) throws IOException {
try {
PartitionTempFileManager fileManager = new PartitionTempFileManager(
- fsFactory, tmpPath, taskNumber, CHECKPOINT_ID);
+ fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
PartitionWriter.Context<T> context = new PartitionWriter.Context<>(
parameters, formatFactory);
writer = PartitionWriterFactory.<T>get(
@@ -149,6 +153,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
private PartitionComputer<T> computer;
+ private OutputFileConfig outputFileConfig = new OutputFileConfig("", "");
+
public Builder<T> setPartitionColumns(String[] partitionColumns) {
this.partitionColumns = partitionColumns;
return this;
@@ -194,6 +200,11 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
return this;
}
+ public Builder<T> setOutputFileConfig(OutputFileConfig outputFileConfig) {
+ this.outputFileConfig = outputFileConfig;
+ return this;
+ }
+
public FileSystemOutputFormat<T> build() {
checkNotNull(partitionColumns, "partitionColumns should not be null");
checkNotNull(formatFactory, "formatFactory should not be null");
@@ -210,7 +221,8 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
dynamicGrouped,
staticPartitions,
formatFactory,
- computer);
+ computer,
+ outputFileConfig);
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
index 77534e9..26e9c81 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,6 +55,7 @@ public class PartitionTempFileManager {
private final int taskNumber;
private final long checkpointId;
private final Path taskTmpDir;
+ private final OutputFileConfig outputFileConfig;
private transient int nameCounter = 0;
@@ -62,9 +64,19 @@ public class PartitionTempFileManager {
Path tmpPath,
int taskNumber,
long checkpointId) throws IOException {
+ this(factory, tmpPath, taskNumber, checkpointId, new OutputFileConfig("", ""));
+ }
+
+ PartitionTempFileManager(
+ FileSystemFactory factory,
+ Path tmpPath,
+ int taskNumber,
+ long checkpointId,
+ OutputFileConfig outputFileConfig) throws IOException {
checkArgument(checkpointId != -1, "checkpoint id start with 0.");
this.taskNumber = taskNumber;
this.checkpointId = checkpointId;
+ this.outputFileConfig = outputFileConfig;
// generate and clean task temp dir.
this.taskTmpDir = new Path(
@@ -85,9 +97,9 @@ public class PartitionTempFileManager {
}
private String newFileName() {
- return String.format(
- checkpointName(checkpointId) + "-" + taskName(taskNumber) + "-file-%d",
- nameCounter++);
+ return String.format("%s%s-%s-file-%d%s",
+ outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
+ taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
}
private static boolean isTaskDir(String fileName) {