You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/27 10:42:12 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #3391: [HUDI-83] Fix Timestamp/Date type read by Hive3

xushiyan commented on code in PR #3391:
URL: https://github.com/apache/hudi/pull/3391#discussion_r883491529


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.avro;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+import java.io.IOException;
+
+public class HudiAvroParquetInputFormat extends ParquetInputFormat<ArrayWritable> {

Review Comment:
   Please avoid using `Hudi` as class prefix. the convention is `Hoodie`. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/Option.java:
##########
@@ -76,6 +76,10 @@ public static <T> Option<T> of(T value) {
     return new Option<>(value);
   }
 
+  public static <T> Option<T> ofNullable(Supplier<? extends T> value) {
+    return null == value ? empty() : ofNullable(value.get());
+  }

Review Comment:
   this API looks weird: why would a Supplier be `null` ? usually you use supplier to supply a fallback value when input is null, e.g.,`orElse(Supplier<? extends T> supplier)`



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java:
##########
@@ -109,4 +109,25 @@ public static List<Pair<String,String>> getIOColumnNameAndTypes(Configuration co
         .collect(Collectors.toList());
   }
 
+  /**
+   * if schema contains timestamp columns, this method is used for compatibility when there is no timestamp fields
+   * We expect 3 cases to use parquet-avro reader to read timestamp column:
+   *  1. read columns contain timestamp type
+   *  2. no read columns and exists original columns contain timestamp type
+   *  3. no read columns and no original columns, but avro schema contains type
+   */
+  public static boolean supportTimestamp(Configuration conf) {
+    List<String> reads = Arrays.asList(getReadColumnNames(conf));
+    if (reads.isEmpty()) {
+      return getIOColumnTypes(conf).contains("timestamp");
+    }
+    List<String> names = getIOColumns(conf);
+    if (names.isEmpty()) {
+      return true;

Review Comment:
   > My consideration is to avoid that some hive related test cases do not explicitly declare these configurations
   
   @cdmikechen we should not change the code logic for the sake of fixing tests. If tests missing the config, then set it. Tests are to serve the main code, not the other way around.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java:
##########
@@ -334,7 +334,7 @@ public class FlinkStreamerConfig extends Configuration {
 
   @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
       + "Disabled by default for backward compatibility.")
-  public Boolean hiveSyncSupportTimestamp = false;
+  public Boolean hiveSyncSupportTimestamp = true;

Review Comment:
   can we revert the default value? this can break existing user scenarios



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+public class HudiAvroParquetReader extends RecordReader<Void, ArrayWritable> {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org