You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhoukang (JIRA)" <ji...@apache.org> on 2019/04/11 02:16:00 UTC

[jira] [Commented] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

    [ https://issues.apache.org/jira/browse/SPARK-26703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815017#comment-16815017 ] 

zhoukang commented on SPARK-26703:
----------------------------------

I can make a pr for this [~hyukjin.kwon]

> Hive record writer will always depends on parquet-1.6 writer should fix it 
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-26703
>                 URL: https://issues.apache.org/jira/browse/SPARK-26703
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: zhoukang
>            Priority: Major
>
> Currently, when we are using insert into hive table related command.
> The parquet file generated will always be version 1.6,reason is below:
> 1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
> {code:java}
> private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
>     jobConf,
>     tableDesc,
>     serializer.getSerializedClass,
>     fileSinkConf,
>     new Path(path),
>     Reporter.NULL)
> {code}
> 2. we will call 
> {code:java}
> public static RecordWriter getHiveRecordWriter(JobConf jc,
>       TableDesc tableInfo, Class<? extends Writable> outputClass,
>       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
>     HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
>     try {
>       boolean isCompressed = conf.getCompressed();
>       JobConf jc_output = jc;
>       if (isCompressed) {
>         jc_output = new JobConf(jc);
>         String codecStr = conf.getCompressCodec();
>         if (codecStr != null && !codecStr.trim().equals("")) {
>           Class<? extends CompressionCodec> codec = 
>               (Class<? extends CompressionCodec>) JavaUtils.loadClass(codecStr);
>           FileOutputFormat.setOutputCompressorClass(jc_output, codec);
>         }
>         String type = conf.getCompressType();
>         if (type != null && !type.trim().equals("")) {
>           CompressionType style = CompressionType.valueOf(type);
>           SequenceFileOutputFormat.setOutputCompressionType(jc, style);
>         }
>       }
>       return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
>           isCompressed, tableInfo.getProperties(), outPath, reporter);
>     } catch (Exception e) {
>       throw new HiveException(e);
>     }
>   }
>   public static RecordWriter getRecordWriter(JobConf jc,
>       OutputFormat<?, ?> outputFormat,
>       Class<? extends Writable> valueClass, boolean isCompressed,
>       Properties tableProp, Path outPath, Reporter reporter
>       ) throws IOException, HiveException {
>     if (!(outputFormat instanceof HiveOutputFormat)) {
>       outputFormat = new HivePassThroughOutputFormat(outputFormat);
>     }
>     return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
>         jc, outPath, valueClass, isCompressed, tableProp, reporter);
>   }
> {code}
> 3. then in MapredParquetOutPutFormat
> {code:java}
> public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
>       final JobConf jobConf,
>       final Path finalOutPath,
>       final Class<? extends Writable> valueClass,
>       final boolean isCompressed,
>       final Properties tableProperties,
>       final Progressable progress) throws IOException {
>     LOG.info("creating new record writer..." + this);
>     final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
>     final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
>     List<String> columnNames;
>     List<TypeInfo> columnTypes;
>     if (columnNameProperty.length() == 0) {
>       columnNames = new ArrayList<String>();
>     } else {
>       columnNames = Arrays.asList(columnNameProperty.split(","));
>     }
>     if (columnTypeProperty.length() == 0) {
>       columnTypes = new ArrayList<TypeInfo>();
>     } else {
>       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
>     }
>     DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
>     return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
>             progress,tableProperties);
>   }
> {code}
> 4. then call 
> {code:java}
> public ParquetRecordWriterWrapper(
>       final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
>       final JobConf jobConf,
>       final String name,
>       final Progressable progress, Properties tableProperties) throws
>           IOException {
>     try {
>       // create a TaskInputOutputContext
>       TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
>       if (taskAttemptID == null) {
>         taskAttemptID = new TaskAttemptID();
>       }
>       taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
>       LOG.info("initialize serde with table properties.");
>       initializeSerProperties(taskContext, tableProperties);
>       LOG.info("creating real writer to write at " + name);
>       realWriter =
>               ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
>       LOG.info("real writer: " + realWriter);
>     } catch (final InterruptedException e) {
>       throw new IOException(e);
>     }
>   }
> {code}
> And the ((ParquetOutputFormat) is verison 1.6.
> And all file generated will miss some useful Statistics like min max of string.
> We should fix this issue to use new features of  parquet



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org