You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/11/16 21:46:56 UTC

[incubator-hudi] branch master updated: - Fixing RT queries for HiveOnSpark that causes race conditions - Adding more comments to understand usage of reader/writer schema

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a05eda  - Fixing RT queries for HiveOnSpark that causes race conditions - Adding more comments to understand usage of reader/writer schema
3a05eda is described below

commit 3a05edab01f72a0e7e9efb890691d938bca4d795
Author: Nishith Agarwal <na...@uber.com>
AuthorDate: Sun Nov 3 23:09:15 2019 -0800

    - Fixing RT queries for HiveOnSpark that causes race conditions
    - Adding more comments to understand usage of reader/writer schema
---
 .../realtime/AbstractRealtimeRecordReader.java     | 38 +++++++++++-----
 .../realtime/HoodieParquetRealtimeInputFormat.java | 53 +++++++++++++---------
 .../realtime/RealtimeCompactedRecordReader.java    | 10 ++--
 3 files changed, 66 insertions(+), 35 deletions(-)

diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 4544b12..215193f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -156,24 +157,39 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   /**
-   * Given a comma separated list of field names and positions at which they appear on Hive, return a ordered list of
-   * field names, that can be passed onto storage.
+   * Given a comma separated list of field names and positions at which they appear on Hive, return
+   * an ordered list of field names, that can be passed onto storage.
    */
   private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
-
-    String[] fieldOrders = fieldOrderCsv.split(",");
-    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn))
-        .collect(Collectors.toList());
-
+    // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
+    // handles duplicate fields orders correctly.
+    // Fields Orders -> {@link https://github
+    // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
+    // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+    // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
+    Set<String> fieldOrdersSet = new LinkedHashSet<>();
+    String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
+    for (String fieldOrder : fieldOrdersWithDups) {
+      fieldOrdersSet.add(fieldOrder);
+    }
+    String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]);
+    List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
+        .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
+    Set<String> fieldNamesSet = new LinkedHashSet<>();
+    for (String fieldName : fieldNames) {
+      fieldNamesSet.add(fieldName);
+    }
     // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
-    if (fieldNames.size() != fieldOrders.length) {
-      throw new HoodieException(
-          String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
+    if (fieldNamesSet.size() != fieldOrders.length) {
+      throw new HoodieException(String
+          .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
               fieldNames.size(), fieldOrders.length));
     }
     TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
+    String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]);
     for (int ox = 0; ox < fieldOrders.length; ox++) {
-      orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
+      orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
     }
     return new ArrayList<>(orderedFieldMap.values());
   }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index ba325e1..bb21116 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -69,7 +69,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
   public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
   public static final int HOODIE_RECORD_KEY_COL_POS = 2;
   public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
-  // Hive on Spark queries do not work with RT tables. Our theory is that due to
+  public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
+  // To make Hive on Spark queries work with RT tables. Our theory is that due to
   // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
   // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
   // times which ultimately breaks the query.
@@ -186,7 +187,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     return conf;
   }
 
-  private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) {
+  private static Configuration addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
     configuration =
         addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
@@ -204,13 +205,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
    * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
    */
   private static Configuration cleanProjectionColumnIds(Configuration conf) {
-    synchronized (conf) {
-      String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
-      if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
-        conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
-        }
+    String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+    if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
+      conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
       }
     }
     return conf;
@@ -219,18 +218,30 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
   @Override
   public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
       final Reporter reporter) throws IOException {
-
-    this.conf = cleanProjectionColumnIds(job);
-    LOG.info("Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
-        + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-
-    // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
-    // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
-    // hoodie additional projection columns are reset after calling setConf and only natural projections
-    // (one found in select queries) are set. things would break because of this.
-    // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
-    // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time.
-    this.conf = addRequiredProjectionFields(job);
+    // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
+    // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
+    // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
+    // latency incurred here due to the synchronization since get record reader is called once per spilt before the
+    // actual heavy lifting of reading the parquet files happen.
+    if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
+      synchronized (job) {
+        LOG.info(
+            "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+                + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+        if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
+          // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
+          // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
+          // hoodie additional projection columns are reset after calling setConf and only natural projections
+          // (one found in select queries) are set. things would break because of this.
+          // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
+          // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
+          // time.
+          this.conf = cleanProjectionColumnIds(job);
+          this.conf = addRequiredProjectionFields(job);
+          this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
+        }
+      }
+    }
 
     LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 766c702..7019907 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -96,11 +96,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         }
         GenericRecord recordToReturn = rec.get();
         if (usesCustomPayload) {
-          // If using a custom payload, return only the projection fields
+          // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
+          // the writerSchema with only the projection fields
           recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
         }
         // we assume, a later safe record in the log, is newer than what we have in the map &
-        // replace it.
+        // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
+        // schema, we use writerSchema to create the arrayWritable from the latest generic record
         ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
         Writable[] replaceValue = aWritable.get();
         if (LOG.isDebugEnabled()) {
@@ -115,7 +117,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
           LOG.error("Got exception when doing array copy", re);
           LOG.error("Base record :" + arrayWritableToString(arrayWritable));
           LOG.error("Log record :" + arrayWritableToString(aWritable));
-          throw re;
+          String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
+              + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
+          throw new RuntimeException(errMsg, re);
         }
       }
       return true;