You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/11/16 21:46:56 UTC
[incubator-hudi] branch master updated: - Fixing RT queries for
HiveOnSpark that causes race conditions - Adding more comments to
understand usage of reader/writer schema
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a05eda - Fixing RT queries for HiveOnSpark that causes race conditions - Adding more comments to understand usage of reader/writer schema
3a05eda is described below
commit 3a05edab01f72a0e7e9efb890691d938bca4d795
Author: Nishith Agarwal <na...@uber.com>
AuthorDate: Sun Nov 3 23:09:15 2019 -0800
- Fixing RT queries for HiveOnSpark that causes race conditions
- Adding more comments to understand usage of reader/writer schema
---
.../realtime/AbstractRealtimeRecordReader.java | 38 +++++++++++-----
.../realtime/HoodieParquetRealtimeInputFormat.java | 53 +++++++++++++---------
.../realtime/RealtimeCompactedRecordReader.java | 10 ++--
3 files changed, 66 insertions(+), 35 deletions(-)
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 4544b12..215193f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -156,24 +157,39 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
- * Given a comma separated list of field names and positions at which they appear on Hive, return a ordered list of
- * field names, that can be passed onto storage.
+ * Given a comma separated list of field names and positions at which they appear on Hive, return
+ * an ordered list of field names, that can be passed onto storage.
*/
private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
-
- String[] fieldOrders = fieldOrderCsv.split(",");
- List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn))
- .collect(Collectors.toList());
-
+ // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
+ // handles duplicate fields orders correctly.
+ // Fields Orders -> {@link https://github
+ // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+ // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
+ // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
+ // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
+ Set<String> fieldOrdersSet = new LinkedHashSet<>();
+ String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
+ for (String fieldOrder : fieldOrdersWithDups) {
+ fieldOrdersSet.add(fieldOrder);
+ }
+ String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]);
+ List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
+ .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
+ Set<String> fieldNamesSet = new LinkedHashSet<>();
+ for (String fieldName : fieldNames) {
+ fieldNamesSet.add(fieldName);
+ }
// Hive does not provide ids for partitioning fields, so check for lengths excluding that.
- if (fieldNames.size() != fieldOrders.length) {
- throw new HoodieException(
- String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
+ if (fieldNamesSet.size() != fieldOrders.length) {
+ throw new HoodieException(String
+ .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length));
}
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
+ String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]);
for (int ox = 0; ox < fieldOrders.length; ox++) {
- orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox));
+ orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
}
return new ArrayList<>(orderedFieldMap.values());
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index ba325e1..bb21116 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -69,7 +69,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
- // Hive on Spark queries do not work with RT tables. Our theory is that due to
+ public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
+ // To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
// times which ultimately breaks the query.
@@ -186,7 +187,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
return conf;
}
- private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) {
+ private static Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration =
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
@@ -204,13 +205,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
* Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
*/
private static Configuration cleanProjectionColumnIds(Configuration conf) {
- synchronized (conf) {
- String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
- if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
- if (LOG.isDebugEnabled()) {
- LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
- }
+ String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+ if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
}
}
return conf;
@@ -219,18 +218,30 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {
-
- this.conf = cleanProjectionColumnIds(job);
- LOG.info("Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
- + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-
- // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
- // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
- // hoodie additional projection columns are reset after calling setConf and only natural projections
- // (one found in select queries) are set. things would break because of this.
- // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
- // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time.
- this.conf = addRequiredProjectionFields(job);
+ // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
+ // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
+ // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
+ // latency incurred here due to the synchronization since get record reader is called once per spilt before the
+ // actual heavy lifting of reading the parquet files happen.
+ if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
+ synchronized (job) {
+ LOG.info(
+ "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
+ // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
+ // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
+ // hoodie additional projection columns are reset after calling setConf and only natural projections
+ // (one found in select queries) are set. things would break because of this.
+ // For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
+ // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
+ // time.
+ this.conf = cleanProjectionColumnIds(job);
+ this.conf = addRequiredProjectionFields(job);
+ this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
+ }
+ }
+ }
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 766c702..7019907 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -96,11 +96,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
}
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {
- // If using a custom payload, return only the projection fields
+ // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
+ // the writerSchema with only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
- // replace it.
+ // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
+ // schema, we use writerSchema to create the arrayWritable from the latest generic record
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
@@ -115,7 +117,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + arrayWritableToString(arrayWritable));
LOG.error("Log record :" + arrayWritableToString(aWritable));
- throw re;
+ String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
+ + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
+ throw new RuntimeException(errMsg, re);
}
}
return true;