You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/05 18:33:08 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3457: HUDI-1827 : Add ORC support in Bootstrap Op

nsivabalan commented on a change in pull request #3457:
URL: https://github.com/apache/hudi/pull/3457#discussion_r743732649



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
##########
@@ -44,25 +36,11 @@ public HoodieSparkBootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
 
   @Override
   protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
-    MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
-      try {
-        Path filePath = FileStatusUtils.toPath(fs.getPath());
-        return new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
-      } catch (Exception ex) {
-        return null;
-      }
+    Path filePath = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
+      return   FileStatusUtils.toPath(fs.getPath());
     }).filter(Objects::nonNull).findAny()
-        .orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));
-
-
-    ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
-        Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
-        Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
-    StructType sparkSchema = converter.convert(parquetSchema);
-    String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
-    String structName = tableName + "_record";
-    String recordNamespace = "hoodie." + tableName;
-
-    return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
+            .orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));

Review comment:
       guess findany() makes sense only when we try to parse schema. With refactoring, I guess we are only fetching the file Path here and so no exception will be thrown. We might have to move this inside BootstrapSchemaProviderFactory.getBootstrapSourceSchema

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProviderFactory.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import java.io.IOException;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+public class BootstrapSchemaProviderFactory {
+
+  public static  Schema getBootstrapSourceSchema(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
+    final String extension = FSUtils.getFileExtension(filePath.getName());
+    if (PARQUET.getFileExtension().equals(extension)) {
+      return getBootstrapSourceSchemaParquet(writeConfig,context,filePath);
+    } else if (ORC.getFileExtension().equals(extension)) {
+      return getBootstrapSourceSchemaOrc(writeConfig,context,filePath);
+    }
+    throw new HoodieException("Could not determine schema from the data files.");
+  }
+
+  private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
+    MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
+
+    ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
+            Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
+            Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
+    StructType sparkSchema = converter.convert(parquetSchema);
+    String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
+    String structName = tableName + "_record";
+    String recordNamespace = "hoodie." + tableName;
+
+    return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
+  }
+
+  private static  Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
+    Reader orcReader = null;
+    try {
+      orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(context.getHadoopConf().get()));
+    } catch (IOException e) {
+      throw new HoodieException("Could not determine schema from the data files.");
+    }
+    TypeDescription orcSchema = orcReader.getSchema();

Review comment:
       to replicate what we do for parquet, we might have to go through all files until we find an entry for which schema is successfully fetched. Not sure if there are valid scenarios where we can't fetch schema from 1st file, but will be able to fetch from subsequent files. but looking at master for parquet, thats what I can infer. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
##########
@@ -796,4 +797,78 @@ private static Schema getActualSchemaType(Schema unionSchema) {
       return Schema.createUnion(nonNullMembers);
     }
   }
+
+  public static Schema createAvroSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) {
+    Schema avroSchema = createAvroSchemaWithNamespace(orcSchema,recordName,namespace);
+    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    List<Field> fieldList = avroSchema.getFields();
+    for (Field field : fieldList) {
+      Schema fieldSchema = field.schema();
+      Schema nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL),fieldSchema);
+      if (nullable) {

Review comment:
       I see we pass "true" for nullable argument. wondering, why take an explicit argument?
   Also, what incase schema for a field is already a union. won't we again union it here. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataHandler.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.keygen.KeyGeneratorInterface;
+
+public interface MetadataHandler {

Review comment:
       and lets rename all concrete impls as well

##########
File path: hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkOrcBootstrapDataProvider.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bootstrap;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.KeyGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Spark Data frame based bootstrap input provider.
+ */
+public class SparkOrcBootstrapDataProvider extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {

Review comment:
       Is there any difference between this class and SparkParquetBootstrapDataProvider except for 
   ```
   sparkSession.read().orc(filePaths) / sparkSession.read().parquet(filePaths)
   ```
   
   If this is the only change, can we do spark.read.format("format") so that we don't need two separate classes. 
   

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcMetadataHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.OrcReaderIterator;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
+import org.apache.hudi.io.HoodieBootstrapHandle;
+import org.apache.hudi.keygen.KeyGeneratorInterface;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+
+import java.io.IOException;
+
+class OrcMetadataHandler implements MetadataHandler {
+  private static final Logger LOG = LogManager.getLogger(OrcMetadataHandler.class);
+
+  HoodieWriteConfig config;
+
+  HoodieTable table;
+
+  HoodieFileStatus srcFileStatus;
+
+  public OrcMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
+    this.config = config;
+    this.table = table;
+    this.srcFileStatus = srcFileStatus;
+  }
+
+  public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator) {
+    Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
+    HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new HoodieBootstrapHandle(config,
+            HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, table, partitionPath, FSUtils.createNewFileIdPfx(),
+            table.getTaskContextSupplier());
+    Schema avroSchema = null;
+    try {
+      Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
+
+      TypeDescription orcSchema = orcReader.getSchema();
+
+      avroSchema =  AvroOrcUtils.createAvroSchema(orcSchema);
+
+      Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
+              keyGenerator.getRecordKeyFieldNames());
+      LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
+      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
+      AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
+
+      BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;

Review comment:
       I see some opportunity for code re-use across parquet MetadataHandler and Orc MetadataHandler. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataHandler.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.keygen.KeyGeneratorInterface;
+
+public interface MetadataHandler {

Review comment:
       java docs.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataHandler.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.bootstrap;
+
+import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
+import org.apache.hudi.keygen.KeyGeneratorInterface;
+
+public interface MetadataHandler {

Review comment:
       can we name this BootstrapMetadataHandler. this is overlapping with our metadata table infra. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
##########
@@ -796,4 +797,78 @@ private static Schema getActualSchemaType(Schema unionSchema) {
       return Schema.createUnion(nonNullMembers);
     }
   }
+
+  public static Schema createAvroSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) {
+    Schema avroSchema = createAvroSchemaWithNamespace(orcSchema,recordName,namespace);
+    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    List<Field> fieldList = avroSchema.getFields();
+    for (Field field : fieldList) {
+      Schema fieldSchema = field.schema();
+      Schema nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL),fieldSchema);
+      if (nullable) {
+        fields.add(new Schema.Field(field.name(), nullableSchema, null, NULL_VALUE));
+      } else {
+        fields.add(new Schema.Field(field.name(), fieldSchema, null, (Object) null));
+      }
+    }
+    Schema schema = Schema.createRecord(recordName, null, null, false);
+    schema.setFields(fields);
+    return schema;
+  }
+
+  private static Schema createAvroSchemaWithNamespace(TypeDescription orcSchema, String recordName, String namespace) {
+    switch (orcSchema.getCategory()) {
+      case BOOLEAN:
+        return Schema.create(Schema.Type.BOOLEAN);

Review comment:
       Did you take inspiration from somewhere for converting Orc schema to AvroSchema? Curious to know. also, would be easy for me to review if its already part of some 3rd party library. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org