You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhoukang (JIRA)" <ji...@apache.org> on 2019/04/11 02:16:00 UTC
[jira] [Commented] (SPARK-26703) Hive record writer will always
depends on parquet-1.6 writer should fix it
[ https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815017#comment-16815017 ]
zhoukang commented on SPARK-26703:
----------------------------------
I can make a pr for this [~hyukjin.kwon]
> Hive record writer will always depends on parquet-1.6 writer should fix it
> ---------------------------------------------------------------------------
>
> Key: SPARK-26703
> URL: https://issues.apache.org/jira/browse/SPARK-26703
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.2, 2.4.0
> Reporter: zhoukang
> Priority: Major
>
> Currently, when we are using insert into hive table related command.
> The parquet file generated will always be version 1.6,reason is below:
> 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
> {code:java}
> private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
> jobConf,
> tableDesc,
> serializer.getSerializedClass,
> fileSinkConf,
> new Path(path),
> Reporter.NULL)
> {code}
> 2. we will call
> {code:java}
> public static RecordWriter getHiveRecordWriter(JobConf jc,
> TableDesc tableInfo, Class<? extends Writable> outputClass,
> FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
> HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
> try {
> boolean isCompressed = conf.getCompressed();
> JobConf jc_output = jc;
> if (isCompressed) {
> jc_output = new JobConf(jc);
> String codecStr = conf.getCompressCodec();
> if (codecStr != null && !codecStr.trim().equals("")) {
> Class<? extends CompressionCodec> codec =
> (Class<? extends CompressionCodec>) JavaUtils.loadClass(codecStr);
> FileOutputFormat.setOutputCompressorClass(jc_output, codec);
> }
> String type = conf.getCompressType();
> if (type != null && !type.trim().equals("")) {
> CompressionType style = CompressionType.valueOf(type);
> SequenceFileOutputFormat.setOutputCompressionType(jc, style);
> }
> }
> return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
> isCompressed, tableInfo.getProperties(), outPath, reporter);
> } catch (Exception e) {
> throw new HiveException(e);
> }
> }
> public static RecordWriter getRecordWriter(JobConf jc,
> OutputFormat<?, ?> outputFormat,
> Class<? extends Writable> valueClass, boolean isCompressed,
> Properties tableProp, Path outPath, Reporter reporter
> ) throws IOException, HiveException {
> if (!(outputFormat instanceof HiveOutputFormat)) {
> outputFormat = new HivePassThroughOutputFormat(outputFormat);
> }
> return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
> jc, outPath, valueClass, isCompressed, tableProp, reporter);
> }
> {code}
> 3. then in MapredParquetOutPutFormat
> {code:java}
> public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
> final JobConf jobConf,
> final Path finalOutPath,
> final Class<? extends Writable> valueClass,
> final boolean isCompressed,
> final Properties tableProperties,
> final Progressable progress) throws IOException {
> LOG.info("creating new record writer..." + this);
> final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
> final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
> List<String> columnNames;
> List<TypeInfo> columnTypes;
> if (columnNameProperty.length() == 0) {
> columnNames = new ArrayList<String>();
> } else {
> columnNames = Arrays.asList(columnNameProperty.split(","));
> }
> if (columnTypeProperty.length() == 0) {
> columnTypes = new ArrayList<TypeInfo>();
> } else {
> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
> }
> DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
> return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
> progress,tableProperties);
> }
> {code}
> 4. then call
> {code:java}
> public ParquetRecordWriterWrapper(
> final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
> final JobConf jobConf,
> final String name,
> final Progressable progress, Properties tableProperties) throws
> IOException {
> try {
> // create a TaskInputOutputContext
> TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
> if (taskAttemptID == null) {
> taskAttemptID = new TaskAttemptID();
> }
> taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
> LOG.info("initialize serde with table properties.");
> initializeSerProperties(taskContext, tableProperties);
> LOG.info("creating real writer to write at " + name);
> realWriter =
> ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
> LOG.info("real writer: " + realWriter);
> } catch (final InterruptedException e) {
> throw new IOException(e);
> }
> }
> {code}
> And the ((ParquetOutputFormat) is verison 1.6.
> And all file generated will miss some useful Statistics like min max of string.
> We should fix this issue to use new features of parquet
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org