You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/18 09:06:59 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #12212: [FLINK-17626][fs-connector] Fs connector should use FLIP-122 format options style

leonardBang commented on a change in pull request #12212:
URL: https://github.com/apache/flink/pull/12212#discussion_r426454025



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -29,6 +29,35 @@
  */
 public class FileSystemOptions {
 
+	public static final ConfigOption<String> PATH = key("path")
+			.stringType()
+			.noDefaultValue()
+			.withDescription("The path of a directory");
+
+	public static final ConfigOption<String> PARTITION_DEFAULT_NAME = key("partition.default-name")
+			.stringType()
+			.defaultValue("__DEFAULT_PARTITION__")
+			.withDescription("The default partition name in case the dynamic partition" +
+					" column value is null/empty string");
+
+	public static final ConfigOption<Long> SINK_ROLLING_POLICY_FILE_SIZE = key("sink.rolling-policy.file-size")
+			.longType()
+			.defaultValue(1024L * 1024L * 128L)
+			.withDescription("The maximum part file size before rolling (by default 128MB).");
+
+	public static final ConfigOption<Long> SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time.interval")

Review comment:
       ```suggestion
   	public static final ConfigOption<Long> SINK_ROLLING_POLICY_TIME_INTERVAL = key("sink.rolling-policy.time-interval")
   ```
   how about rename to `sink.rolling-policy.time-interval` which is closer to  FLIP-122's style ?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -23,137 +23,104 @@
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
 import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
 import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.PartitionPathUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
 
 /**
  * Parquet {@link FileSystemFormatFactory} for file system.
  */
 public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory {
 
-	public static final ConfigOption<Boolean> UTC_TIMEZONE = key("format.utc-timezone")
+	public static final String IDENTIFIER = "parquet";
+
+	public static final ConfigOption<Boolean> UTC_TIMEZONE = key("utc-timezone")
 			.booleanType()
 			.defaultValue(false)
 			.withDescription("Use UTC timezone or local timezone to the conversion between epoch" +
 					" time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" +
 					" use UTC timezone");
 
-	/**
-	 * Prefix for parquet-related properties, besides format, start with "parquet".
-	 * See more in {@link ParquetOutputFormat}.
-	 * - parquet.compression
-	 * - parquet.block.size
-	 * - parquet.page.size
-	 * - parquet.dictionary.page.size
-	 * - parquet.writer.max-padding
-	 * - parquet.enable.dictionary
-	 * - parquet.validation
-	 * - parquet.writer.version
-	 * ...
-	 */
-	public static final String PARQUET_PROPERTIES = "format.parquet";
-
 	@Override
-	public Map<String, String> requiredContext() {
-		Map<String, String> context = new HashMap<>();
-		context.put(FORMAT, "parquet");
-		return context;
+	public String factoryIdentifier() {
+		return IDENTIFIER;
 	}
 
 	@Override
-	public List<String> supportedProperties() {
-		return Arrays.asList(
-				UTC_TIMEZONE.key(),
-				PARQUET_PROPERTIES + ".*"
-		);
+	public Set<ConfigOption<?>> requiredOptions() {
+		return new HashSet<>();
 	}
 
-	private static boolean isUtcTimestamp(DescriptorProperties properties) {
-		return properties.getOptionalBoolean(UTC_TIMEZONE.key())
-				.orElse(UTC_TIMEZONE.defaultValue());
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(UTC_TIMEZONE);
+		// support "parquet.*"

Review comment:
       could we list all supported Options here? if yes please add for orc too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org