You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/25 00:50:42 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2190: [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files

vinothchandar commented on a change in pull request #2190:
URL: https://github.com/apache/hudi/pull/2190#discussion_r511530095



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -85,53 +85,55 @@ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOEx
       // if the result is false, then there are no more records
       return false;
     } else {
-      // TODO(VC): Right now, we assume all records in log, have a matching base record. (which
-      // would be true until we have a way to index logs too)
-      // return from delta records map if we have some match.
-      String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
-      if (deltaRecordMap.containsKey(key)) {
-        // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
-        // deltaRecord may not be a full record and needs values of columns from the parquet
-        Option<GenericRecord> rec;
-        if (usesCustomPayload) {
-          rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
-        } else {
-          rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
-        }
-        if (!rec.isPresent()) {
-          // If the record is not present, this is a delete record using an empty payload so skip this base record
-          // and move to the next record
-          return next(aVoid, arrayWritable);
-        }
-        GenericRecord recordToReturn = rec.get();
-        if (usesCustomPayload) {
-          // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
-          // the writerSchema with only the projection fields
-          recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
-        }
-        // we assume, a later safe record in the log, is newer than what we have in the map &
-        // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
-        // schema, we use writerSchema to create the arrayWritable from the latest generic record
-        ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
-        Writable[] replaceValue = aWritable.get();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
-              HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
-        }
-        Writable[] originalValue = arrayWritable.get();
-        try {
-          // Sometime originalValue.length > replaceValue.length.
-          // This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
-          System.arraycopy(replaceValue, 0, originalValue, 0,
-              Math.min(originalValue.length, replaceValue.length));
-          arrayWritable.set(originalValue);
-        } catch (RuntimeException re) {
-          LOG.error("Got exception when doing array copy", re);
-          LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
-          LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
-          String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
-              + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
-          throw new RuntimeException(errMsg, re);
+      if (!deltaRecordMap.isEmpty()) {

Review comment:
       I think we can structure this as a if block without need for the else? since the if above anyway returns out. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org