You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/09/24 15:31:41 UTC

[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1141: PARQUET-2347: Add interface layer between Parquet and Hadoop Configuration

wgtmac commented on code in PR #1141:
URL: https://github.com/apache/parquet-mr/pull/1141#discussion_r1335198227


##########
parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java:
##########
@@ -154,9 +172,9 @@ private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newF
 
   @Override
   public ReadContext init(InitContext initContext) {
-    Schema pigSchema = getPigSchema(initContext.getConfiguration());
-    RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration());
-    boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+    Schema pigSchema = getPigSchema(initContext.getConfig());

Review Comment:
   `initContext.getConfig` and `initContext.getConfiguration` would be confusing. Can we use a much clearer version, something like `initContext.getParquetConfiguration`?



##########
parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java:
##########
@@ -40,14 +42,22 @@
 public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
   public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
   private static final Logger LOG = LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
-  private static Configuration conf;
+  private static ParquetConfiguration conf;
 
   public static void setGenericThriftClass(Configuration configuration, Class<?> thriftClass) {
+    setGenericThriftClass(new HadoopParquetConfiguration(configuration), thriftClass);
+  }
+
+  public static void setGenericThriftClass(ParquetConfiguration configuration, Class<?> thriftClass) {
     conf = configuration;
     configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
   }
 
   public static Class getGenericThriftClass(Configuration configuration) {
+    return getGenericThriftClass(new HadoopParquetConfiguration(configuration));
+  }
+
+  public static Class<?> getGenericThriftClass(ParquetConfiguration configuration) {

Review Comment:
   Why does this overload have a different return type?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java:
##########
@@ -261,7 +263,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
 
         LOG.debug("read value: {}", currentValue);
       } catch (RuntimeException e) {
-        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e);
+        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getFile()), e);

Review Comment:
   If my previous statement is correct, then perhaps we do not need change in this line?



##########
pom.xml:
##########
@@ -547,6 +547,8 @@
             </excludeModules>
             <excludes>
               <exclude>${shade.prefix}</exclude>
+              <exclude>org.apache.parquet.hadoop.CodecFactory</exclude>

Review Comment:
   These lines worth adding some comments.



##########
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##########
@@ -333,13 +406,17 @@ public Builder copy(ParquetReadOptions options) {
 
     public ParquetReadOptions build() {
       if (codecFactory == null) {
-        codecFactory = HadoopCodecs.newFactory(0);
+        if (conf == null) {
+          codecFactory = HadoopCodecs.newFactory(0);
+        } else {
+          codecFactory = HadoopCodecs.newFactory(conf, 0);

Review Comment:
   It seems that original ParquetReadOptions does not require any Configuration parameter. Should we avoid adding this by using ParquetConfiguration internally in the HadoopCodecs?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java:
##########
@@ -41,4 +49,18 @@ public static Class<?> getClassFromConfig(Configuration configuration, String co
     }
   }
 
+  public static Configuration createHadoopConfiguration(ParquetConfiguration conf) {
+    if (conf == null) {
+      return new Configuration();
+    }
+    if (conf instanceof HadoopParquetConfiguration) {
+      return ((HadoopParquetConfiguration) conf).getConfiguration();
+    }
+    Configuration configuration = new Configuration();

Review Comment:
   When will it happen?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java:
##########
@@ -77,6 +87,10 @@ public Map<String, String> getMergedKeyValueMetaData() {
    * @return the configuration for this job
    */
   public Configuration getConfiguration() {
+    return ConfigurationUtil.createHadoopConfiguration(configuration);
+  }
+
+  public ParquetConfiguration getConfig() {

Review Comment:
   ```suggestion
     public ParquetConfiguration getParquetConfiguration() {
   ```
   
   WDYT?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
         codecClass = Class.forName(codecClassName);
       } catch (ClassNotFoundException e) {
         // Try to load the class using the job classloader
-        codecClass = configuration.getClassLoader().loadClass(codecClassName);
+        codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName);

Review Comment:
   Sorry I don't understand this line. Why not use `configuration` (i.e. ParquetConfiguration)?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
         codecClass = Class.forName(codecClassName);
       } catch (ClassNotFoundException e) {
         // Try to load the class using the job classloader
-        codecClass = configuration.getClassLoader().loadClass(codecClassName);
+        codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName);
       }
-      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, ConfigurationUtil.createHadoopConfiguration(configuration));

Review Comment:
   If the codec still comes from Hadoop dependency, does it mean that we can only use uncompressed codec if we do not have them?
   
   cc @Fokko as I remember the effort to remove hadoop dependency from the parquet codec.



##########
parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java:
##########
@@ -423,7 +435,7 @@ private static GenericData getDataModel(Configuration conf, Schema schema) {
 
     Class<? extends AvroDataSupplier> suppClass = conf.getClass(
         AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
-    return ReflectionUtils.newInstance(suppClass, conf).get();
+    return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get();

Review Comment:
   Does it mean that we still cannot get rid of the hadoop dependency? Anyway we have to depend on Configuration class here.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java:
##########
@@ -167,13 +169,13 @@ public float getProgress() throws IOException, InterruptedException {
 
   public void initialize(ParquetFileReader reader, ParquetReadOptions options) {
     // copy custom configuration to the Configuration passed to the ReadSupport
-    Configuration conf = new Configuration();
-    if (options instanceof HadoopReadOptions) {
-      conf = ((HadoopReadOptions) options).getConf();
-    }
+    ParquetConfiguration conf = Objects.requireNonNull(options).getConfiguration();
     for (String property : options.getPropertyNames()) {
       conf.set(property, options.getProperty(property));
     }
+    for (Map.Entry<String, String> property : new Configuration()) {

Review Comment:
   CMIW, we do not need to address the Hadoop dependency in the parquet-hadoop module, right?



##########
parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java:
##########
@@ -254,29 +273,64 @@ public RecordMaterializer<T> prepareForRead(Configuration configuration,
         configuration);
   }
 
-  @SuppressWarnings("unchecked")
+  @Override
+  public RecordMaterializer<T> prepareForRead(ParquetConfiguration configuration,
+                                              Map<String, String> keyValueMetaData, MessageType fileSchema,
+                                              org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+    ThriftMetaData thriftMetaData = ThriftMetaData.fromExtraMetaData(keyValueMetaData);
+    try {
+      initThriftClass(thriftMetaData, configuration);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Cannot find Thrift object class for metadata: " + thriftMetaData, e);
+    }
+
+    // if there was not metadata in the file, get it from requested class
+    if (thriftMetaData == null) {
+      thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
+    }
+
+    String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY, RECORD_CONVERTER_DEFAULT);
+    return getRecordConverterInstance(converterClassName, thriftClass,
+      readContext.getRequestedSchema(), thriftMetaData.getDescriptor(),
+      configuration);
+  }
+
   private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
       String converterClassName, Class<T> thriftClass,
       MessageType requestedSchema, StructType descriptor, Configuration conf) {
-    Class<ThriftRecordConverter<T>> converterClass;
+    return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, Configuration.class);
+  }
+
+  private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
+      String converterClassName, Class<T> thriftClass,
+      MessageType requestedSchema, StructType descriptor, ParquetConfiguration conf) {
+    return getRecordConverterInstance(converterClassName, thriftClass, requestedSchema, descriptor, conf, ParquetConfiguration.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T1, T2> ThriftRecordConverter<T1> getRecordConverterInstance(

Review Comment:
   nit: could we give T1 and T2 better names?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org