You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhoukang (JIRA)" <ji...@apache.org> on 2019/01/23 10:43:00 UTC

[jira] [Created] (SPARK-26703) Hive record writer will always depends on parquet-1.6 writer should fix it

zhoukang created SPARK-26703:
--------------------------------

             Summary: Hive record writer will always depends on parquet-1.6 writer should fix it 
                 Key: SPARK-26703
                 URL: https://issues.apache.org/jira/browse/SPARK-26703
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.4.0, 2.3.2
            Reporter: zhoukang


Currently, when we are using insert into hive table related command.
The parquet file generated will always be version 1.6,reason is below:
1. we rely on hive-exec HiveFileFormatUtils to get recordWriter
{code:java}
private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
    jobConf,
    tableDesc,
    serializer.getSerializedClass,
    fileSinkConf,
    new Path(path),
    Reporter.NULL)
{code}
2. we will call 
{code:java}
public static RecordWriter getHiveRecordWriter(JobConf jc,
      TableDesc tableInfo, Class<? extends Writable> outputClass,
      FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
    HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
    try {
      boolean isCompressed = conf.getCompressed();
      JobConf jc_output = jc;
      if (isCompressed) {
        jc_output = new JobConf(jc);
        String codecStr = conf.getCompressCodec();
        if (codecStr != null && !codecStr.trim().equals("")) {
          Class<? extends CompressionCodec> codec = 
              (Class<? extends CompressionCodec>) JavaUtils.loadClass(codecStr);
          FileOutputFormat.setOutputCompressorClass(jc_output, codec);
        }
        String type = conf.getCompressType();
        if (type != null && !type.trim().equals("")) {
          CompressionType style = CompressionType.valueOf(type);
          SequenceFileOutputFormat.setOutputCompressionType(jc, style);
        }
      }
      return getRecordWriter(jc_output, hiveOutputFormat, outputClass,
          isCompressed, tableInfo.getProperties(), outPath, reporter);
    } catch (Exception e) {
      throw new HiveException(e);
    }
  }

  public static RecordWriter getRecordWriter(JobConf jc,
      OutputFormat<?, ?> outputFormat,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProp, Path outPath, Reporter reporter
      ) throws IOException, HiveException {
    if (!(outputFormat instanceof HiveOutputFormat)) {
      outputFormat = new HivePassThroughOutputFormat(outputFormat);
    }
    return ((HiveOutputFormat)outputFormat).getHiveRecordWriter(
        jc, outPath, valueClass, isCompressed, tableProp, reporter);
  }
{code}

3. then in MapredParquetOutPutFormat

{code:java}
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
      final JobConf jobConf,
      final Path finalOutPath,
      final Class<? extends Writable> valueClass,
      final boolean isCompressed,
      final Properties tableProperties,
      final Progressable progress) throws IOException {

    LOG.info("creating new record writer..." + this);

    final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
    final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
    List<String> columnNames;
    List<TypeInfo> columnTypes;

    if (columnNameProperty.length() == 0) {
      columnNames = new ArrayList<String>();
    } else {
      columnNames = Arrays.asList(columnNameProperty.split(","));
    }

    if (columnTypeProperty.length() == 0) {
      columnTypes = new ArrayList<TypeInfo>();
    } else {
      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    }

    DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);

    return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
            progress,tableProperties);
  }
{code}
4. then call 

{code:java}
public ParquetRecordWriterWrapper(
      final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
      final JobConf jobConf,
      final String name,
      final Progressable progress, Properties tableProperties) throws
          IOException {
    try {
      // create a TaskInputOutputContext
      TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
      if (taskAttemptID == null) {
        taskAttemptID = new TaskAttemptID();
      }
      taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);

      LOG.info("initialize serde with table properties.");
      initializeSerProperties(taskContext, tableProperties);

      LOG.info("creating real writer to write at " + name);

      realWriter =
              ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));

      LOG.info("real writer: " + realWriter);
    } catch (final InterruptedException e) {
      throw new IOException(e);
    }
  }
{code}

And the ((ParquetOutputFormat) is verison 1.6.
And all file generated will miss some useful Statistics like min max of string.
We should fix this issue to use new features of  parquet




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org