You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/11/03 04:00:30 UTC

[hudi] branch master updated: [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (#2190)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f5c15b  [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (#2190)
5f5c15b is described below

commit 5f5c15b0d9f807dc7f9112a5220c3daeb01f0cb8
Author: lw0090 <lw...@gmail.com>
AuthorDate: Tue Nov 3 12:00:12 2020 +0800

    [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files (#2190)
    
    * [HUDI-892] RealtimeParquetInputFormat skip adding projection columns if there are no log files
    * [HUDI-892]  for test
    * [HUDI-892]  fix bug generate array from split
    * [HUDI-892] revert test log
---
 .../realtime/HoodieParquetRealtimeInputFormat.java | 23 ++++++++++++----------
 .../realtime/RealtimeCompactedRecordReader.java    |  5 +++--
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  7 +++++--
 3 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index 5bcfbe9..d8f0a01 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -79,9 +79,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     return timeline;
   }
 
-  @Override
-  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
-                                                                   final Reporter reporter) throws IOException {
+  void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
     // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
     // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
     // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
@@ -101,22 +99,27 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
           // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
           // time.
           HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
-          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
-
+          if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
+            HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
+          }
           this.conf = jobConf;
           this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
         }
       }
     }
+  }
 
-    LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
-        + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-
+  @Override
+  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
+                                                                   final Reporter reporter) throws IOException {
     // sanity check
     ValidationUtils.checkArgument(split instanceof RealtimeSplit,
         "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split);
-
-    return new HoodieRealtimeRecordReader((RealtimeSplit) split, jobConf,
+    RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+    addProjectionToJobConf(realtimeSplit, jobConf);
+    LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+        + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    return new HoodieRealtimeRecordReader(realtimeSplit, jobConf,
         super.getRecordReader(split, jobConf, reporter));
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 042199f..b710b59 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -84,7 +84,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
     if (!result) {
       // if the result is false, then there are no more records
       return false;
-    } else {
+    }
+    if (!deltaRecordMap.isEmpty()) {
       // TODO(VC): Right now, we assume all records in log, have a matching base record. (which
       // would be true until we have a way to index logs too)
       // return from delta records map if we have some match.
@@ -134,8 +135,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
           throw new RuntimeException(errMsg, re);
         }
       }
-      return true;
     }
+    return true;
   }
 
   @Override
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 871f722..b10e778 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -58,6 +60,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 public class HoodieRealtimeRecordReaderUtils {
+  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
 
   /**
    * Reads the schema from the base file.
@@ -246,10 +249,10 @@ public class HoodieRealtimeRecordReaderUtils {
     // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
     // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
     // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
-    String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
+    String[] fieldOrdersWithDups = fieldOrderCsv.isEmpty() ? new String[0] : fieldOrderCsv.split(",");
     Set<String> fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups));
     String[] fieldOrders = fieldOrdersSet.toArray(new String[0]);
-    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
+    List<String> fieldNames = fieldNameCsv.isEmpty() ? new ArrayList<>() : Arrays.stream(fieldNameCsv.split(","))
         .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
     Set<String> fieldNamesSet = new LinkedHashSet<>(fieldNames);
     // Hive does not provide ids for partitioning fields, so check for lengths excluding that.