You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/10/22 14:17:00 UTC

[jira] [Commented] (PARQUET-1903) Improve Parquet Protobuf Usability

    [ https://issues.apache.org/jira/browse/PARQUET-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219035#comment-17219035 ] 

ASF GitHub Bot commented on PARQUET-1903:
-----------------------------------------

gszadovszky commented on a change in pull request #813:
URL: https://github.com/apache/parquet-mr/pull/813#discussion_r510168205



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
##########
@@ -136,6 +136,7 @@ public T read() throws IOException {
         return reader == null ? null : read();
       }
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();

Review comment:
       I have no idea why some of the methods of `InternalParquetRecordReader` throw `InterruptedException`. I find no reason for that as there are no concurrency/thread handling is implemented there and not designed for extension. 
   What do you think about leaving as is for now and handle this in a separate issue? (Or if you have some answers please light me up :smiley:)

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetReader.java
##########
@@ -33,8 +33,14 @@
 public class ProtoParquetReader<T extends MessageOrBuilder> extends ParquetReader<T> {
 
   @SuppressWarnings("unchecked")
-  public static <T> Builder<T> builder(Path file) {
-    return ParquetReader.builder(new ProtoReadSupport(), file);
+  public static <T extends MessageOrBuilder> Builder<T> builder(Path file) {

Review comment:
       I am not sure if it won't break backward compatibility.

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
##########
@@ -82,42 +84,70 @@ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage) thro
     this(file, protoMessage, CompressionCodecName.UNCOMPRESSED,
             DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
-  
-  public static <T> Builder<T> builder(Path file) {
-	    return new Builder<T>(file);
-	}
-
-	public static <T> Builder<T> builder(OutputFile file) {
-	    return new Builder<T>(file);
-	}
-	
-	private static <T extends MessageOrBuilder> WriteSupport<T> writeSupport(Class<? extends Message> protoMessage) {
-		return new ProtoWriteSupport<T>(protoMessage);
-	}
-	  
-	public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
-		  
-		Class<? extends Message> protoMessage = null;
-
-		private Builder(Path file) {
-			super(file);
-		}
-
-		private Builder(OutputFile file) {
-		    super(file);
-		}
-
-		protected Builder<T> self() {
-		    return this;
-		}
-		
-		public Builder<T> withMessage(Class<? extends Message> protoMessage){
-			this.protoMessage = protoMessage;
-			return this;
-		}
-
-		protected WriteSupport<T> getWriteSupport(Configuration conf) {
-		    return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
-		}    
-	}
+
+  public static <T extends MessageOrBuilder> Builder<T> builder(Path file) {

Review comment:
       Might break backward compatibility.

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
##########
@@ -74,24 +79,72 @@ public ReadContext init(InitContext context) {
 
   @Override
   public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
-    String headerProtoClass = keyValueMetaData.get(PB_CLASS);
-    String configuredProtoClass = configuration.get(PB_CLASS);
-
-    if (configuredProtoClass != null) {
-      LOG.debug("Replacing class " + headerProtoClass + " by " + configuredProtoClass);
-      headerProtoClass = configuredProtoClass;
-    }
-
-    if (headerProtoClass == null) {
-      throw new RuntimeException("I Need parameter " + PB_CLASS + " with Protocol Buffer class");
+    final Optional<String> headerProtoClass = Optional.ofNullable(keyValueMetaData.get(PB_CLASS));
+    final Optional<String> headerProtoType = Optional.ofNullable(keyValueMetaData.get(PB_TYPE));
+
+    final Optional<String> configuredProtoClass = Optional.ofNullable(configuration.get(PB_CLASS));
+    final Optional<String> configuredProtoType = Optional.ofNullable(configuration.get(PB_TYPE));
+
+    final String candidateClass;
+
+    /* 
+     * Load the class type, with backwards-compatibility for class-only support (no registry).
+     * Configured values override any values found in the meta section of the file.
+     */
+
+    if (configuredProtoType.isPresent()) {
+      // No schema registry implemented yet
+      LOG.debug("Configured proto type: {}", configuredProtoType.get());
+      candidateClass = parseProtoType(configuredProtoType.get()).getValue();
+    } else if (configuredProtoClass.isPresent()) {
+      LOG.debug("Configured proto class: {}", configuredProtoClass.get());
+      candidateClass = configuredProtoClass.get();
+    } else if (headerProtoType.isPresent()) {
+      // No schema registry implemented yet
+      LOG.debug("Parquet meta proto type: {}", headerProtoType.get());
+      candidateClass = parseProtoType(headerProtoType.get()).getValue();
+    } else if (headerProtoClass.isPresent()) {
+      LOG.debug("Parquet meta proto class: {}", headerProtoClass.get());
+      candidateClass = headerProtoClass.get();
+    } else {
+      throw new IllegalArgumentException("No proto class specified for read");
     }
 
-    LOG.debug("Reading data with Protocol Buffer class {}", headerProtoClass);
+    LOG.debug("Reading data with Protocol Buffer class {}", candidateClass);
 
-    MessageType requestedSchema = readContext.getRequestedSchema();
-    Class<? extends Message> protobufClass = Protobufs.getProtobufClass(headerProtoClass);
-    return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData);
+    try {
+      MessageType requestedSchema = readContext.getRequestedSchema();
+      return new ProtoRecordMaterializer<T>(configuration, requestedSchema,
+          ProtoUtils.loadDefaultInstance(candidateClass), keyValueMetaData);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Could not protobuf class: " + candidateClass, e);

Review comment:
       ```suggestion
         throw new IllegalArgumentException("Could not load protobuf class: " + candidateClass, e);
   ```

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetWriter.java
##########
@@ -82,42 +84,70 @@ public ProtoParquetWriter(Path file, Class<? extends Message> protoMessage) thro
     this(file, protoMessage, CompressionCodecName.UNCOMPRESSED,
             DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
-  
-  public static <T> Builder<T> builder(Path file) {
-	    return new Builder<T>(file);
-	}
-
-	public static <T> Builder<T> builder(OutputFile file) {
-	    return new Builder<T>(file);
-	}
-	
-	private static <T extends MessageOrBuilder> WriteSupport<T> writeSupport(Class<? extends Message> protoMessage) {
-		return new ProtoWriteSupport<T>(protoMessage);
-	}
-	  
-	public static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
-		  
-		Class<? extends Message> protoMessage = null;
-
-		private Builder(Path file) {
-			super(file);
-		}
-
-		private Builder(OutputFile file) {
-		    super(file);
-		}
-
-		protected Builder<T> self() {
-		    return this;
-		}
-		
-		public Builder<T> withMessage(Class<? extends Message> protoMessage){
-			this.protoMessage = protoMessage;
-			return this;
-		}
-
-		protected WriteSupport<T> getWriteSupport(Configuration conf) {
-		    return (WriteSupport<T>) ProtoParquetWriter.writeSupport(protoMessage);
-		}    
-	}
+
+  public static <T extends MessageOrBuilder> Builder<T> builder(Path file) {
+    return new Builder<T>(file);
+  }
+
+  public static <T extends MessageOrBuilder> Builder<T> builder(Path file, T message) {
+    return new Builder<T>(file).withMessage(message);
+  }
+
+  public static <T extends MessageOrBuilder> Builder<T> builder(OutputFile file) {

Review comment:
       Might break backward compatibility.

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
##########
@@ -74,27 +76,53 @@
    * @param parquetSchema The (part of) parquet schema that should match to the expected proto
    * @param extraMetadata Metadata from parquet footer, containing useful information about parquet-proto convertion behavior
    */
-  ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema, Map<String, String> extraMetadata) {
-    this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata);
-  }
-
-  // For usage in message arrays
-  ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map<String, String> extraMetadata) {
+  ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, MessageOrBuilder message, GroupType parquetSchema, Map<String, String> extraMetadata) {
 
     int schemaSize = parquetSchema.getFieldCount();
     converters = new Converter[schemaSize];
     this.conf = conf;
-    this.parent = pvc;
+    this.parent = Objects.requireNonNull(pvc, "Missing parent value container");
     this.extraMetadata = extraMetadata;
     int parquetFieldIndex = 1;
 
-    if (pvc == null) {
-      throw new IllegalStateException("Missing parent value container");
+    this.message = message;
+
+    this.protoBuilder =
+        (message instanceof Message) ? ((Message) message).newBuilderForType()
+            : ((Message.Builder) message).clone().clear();
+
+    Descriptors.Descriptor protoDescriptor = message.getDescriptorForType();
+
+    for (Type parquetField : parquetSchema.getFields()) {
+      Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName());
+
+      if (protoField == null) {
+        String description = "Scheme mismatch \n\"" + parquetField + "\"" +
+                "\n proto descriptor:\n" + protoDescriptor.toProto();
+        throw new IncompatibleSchemaModificationException("Cant find \"" + parquetField.getName() + "\" " + description);
+      }
+
+      converters[parquetFieldIndex - 1] = newMessageConverter(this.protoBuilder, protoField, parquetField);
+
+      parquetFieldIndex++;
     }
+  }
+
+  @Deprecated

Review comment:
       This constructor and the whole class are package private. I would suggest simply removing it and rewrite the callers to call the other one.

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetMessageIterator.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.proto;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.parquet.hadoop.ParquetReader;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.MessageOrBuilder;
+
+public class ProtoParquetMessageIterator<T extends Message> implements Iterator<T> {
+
+  private final ParquetReader<MessageOrBuilder> reader;
+  private boolean hasNext;
+  private T currentMessage;
+
+  private ProtoParquetMessageIterator(ParquetReader<MessageOrBuilder> reader) {
+    this.reader = reader;
+    this.hasNext = true;
+    this.currentMessage = null;
+  }
+
+  public static <T extends Message> ProtoParquetMessageIterator<T> wrap(
+      ParquetReader<MessageOrBuilder> reader) {
+    return new ProtoParquetMessageIterator<>(reader);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean hasNext() {
+    if (this.currentMessage != null) {
+      return true;
+    }
+    try {
+      MessageOrBuilder message = this.reader.read();
+      if (message == null) {
+        hasNext = false;
+      } else {
+        if (message instanceof Builder) {
+          this.currentMessage = (T) ((Builder) message).build();
+        } else if (message instanceof Message) {
+          this.currentMessage = (T) message;
+        }
+        this.hasNext = true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to read next protobuf", e);

Review comment:
       Similarly to the previous comment.

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoParquetIterator.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.proto;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.parquet.hadoop.ParquetReader;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.MessageOrBuilder;
+
+public class ProtoParquetIterator<T extends Message> implements Iterator<T> {
+
+  private final ParquetReader<T> reader;
+  private boolean hasNext;
+  private T currentMessage;
+
+  private ProtoParquetIterator(ParquetReader<T> reader) {
+    this.reader = reader;
+    this.hasNext = true;
+    this.currentMessage = null;
+  }
+
+  public static <T extends Message> ProtoParquetIterator<T> wrap(
+      ParquetReader<T> reader) {
+    return new ProtoParquetIterator<>(reader);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean hasNext() {
+    if (this.currentMessage != null) {
+      return true;
+    }
+    try {
+      MessageOrBuilder message = this.reader.read();
+      if (message == null) {
+        hasNext = false;
+      } else {
+        if (message instanceof Builder) {
+          this.currentMessage = (T) ((Builder) message).build();
+        } else if (message instanceof Message) {
+          this.currentMessage = (T) message;
+        }
+        this.hasNext = true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to read next protobuf", e);

Review comment:
       Aren't we suppose to use something parquet specific? A `ParquetRuntimeException` or one of its descendants?

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
##########
@@ -113,12 +151,36 @@ public void prepareForWrite(RecordConsumer recordConsumer) {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    if (!this.message.isPresent()) {
+      return initClass(configuration);
+    }
+    this.writeSpecsCompliant = configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
+    MessageType rootSchema = new ProtoSchemaConverter(writeSpecsCompliant).convert(this.message.get());
+    Descriptor messageDescriptor = this.message.get().getDescriptorForType();
+    validatedMapping(messageDescriptor, rootSchema);
+
+    this.messageWriter = new MessageWriter(messageDescriptor, rootSchema);
+
+    // Create the protobuf class FQDN (type.googleapis.com/org.my.proto.Document)
+    final String protoSchemaRegistry = this.schemaRegistry.orElse("type.googleapis.com");
+    final String protoClass = this.message.get().getClass().getName();
+    final String protoType = protoSchemaRegistry + '/' + protoClass;
+
+    Map<String, String> extraMetaData = new HashMap<>();
+    extraMetaData.put(ProtoReadSupport.PB_CLASS, protoClass);
+    extraMetaData.put(ProtoReadSupport.PB_TYPE, protoType);
+    extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(this.message.get()));
+    extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant));
+    return new WriteContext(rootSchema, extraMetaData);
+  }
 
+  @Deprecated

Review comment:
       Why deprecate a private method?

##########
File path: parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
##########
@@ -74,24 +79,72 @@ public ReadContext init(InitContext context) {
 
   @Override
   public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
-    String headerProtoClass = keyValueMetaData.get(PB_CLASS);
-    String configuredProtoClass = configuration.get(PB_CLASS);
-
-    if (configuredProtoClass != null) {
-      LOG.debug("Replacing class " + headerProtoClass + " by " + configuredProtoClass);
-      headerProtoClass = configuredProtoClass;
-    }
-
-    if (headerProtoClass == null) {
-      throw new RuntimeException("I Need parameter " + PB_CLASS + " with Protocol Buffer class");
+    final Optional<String> headerProtoClass = Optional.ofNullable(keyValueMetaData.get(PB_CLASS));
+    final Optional<String> headerProtoType = Optional.ofNullable(keyValueMetaData.get(PB_TYPE));
+
+    final Optional<String> configuredProtoClass = Optional.ofNullable(configuration.get(PB_CLASS));
+    final Optional<String> configuredProtoType = Optional.ofNullable(configuration.get(PB_TYPE));
+
+    final String candidateClass;
+
+    /* 
+     * Load the class type, with backwards-compatibility for class-only support (no registry).
+     * Configured values override any values found in the meta section of the file.
+     */
+
+    if (configuredProtoType.isPresent()) {
+      // No schema registry implemented yet
+      LOG.debug("Configured proto type: {}", configuredProtoType.get());
+      candidateClass = parseProtoType(configuredProtoType.get()).getValue();
+    } else if (configuredProtoClass.isPresent()) {
+      LOG.debug("Configured proto class: {}", configuredProtoClass.get());
+      candidateClass = configuredProtoClass.get();
+    } else if (headerProtoType.isPresent()) {
+      // No schema registry implemented yet
+      LOG.debug("Parquet meta proto type: {}", headerProtoType.get());
+      candidateClass = parseProtoType(headerProtoType.get()).getValue();
+    } else if (headerProtoClass.isPresent()) {
+      LOG.debug("Parquet meta proto class: {}", headerProtoClass.get());
+      candidateClass = headerProtoClass.get();
+    } else {
+      throw new IllegalArgumentException("No proto class specified for read");
     }
 
-    LOG.debug("Reading data with Protocol Buffer class {}", headerProtoClass);
+    LOG.debug("Reading data with Protocol Buffer class {}", candidateClass);
 
-    MessageType requestedSchema = readContext.getRequestedSchema();
-    Class<? extends Message> protobufClass = Protobufs.getProtobufClass(headerProtoClass);
-    return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData);
+    try {
+      MessageType requestedSchema = readContext.getRequestedSchema();
+      return new ProtoRecordMaterializer<T>(configuration, requestedSchema,
+          ProtoUtils.loadDefaultInstance(candidateClass), keyValueMetaData);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Could not protobuf class: " + candidateClass, e);
+    }
   }
 
+  /**
+   * Split a fully qualified protobuf type into its two parts.
+   *
+   * <pre>
+   * schema-registry/class
+   * </pre>
+   *
+   * <pre>
+   * type.googleapis.com / google.profile.Person
+   * </pre>
+   *
+   * <ul>
+   * <li>type.googleapis.com/google.profile.Person =
+   * ["type.googleapis.com","google.profile.Person"]</li>
+   * <li>google.profile.Person = ["","google.profile.Person"]</li>
+   * </ul>
+   *
+   * @param protoType The protobuf fully qualifies type
+   * @return Entry containing registry and class information
+   */
+  private Entry<String, String> parseProtoType(final String protoType) {
+    final String[] parts = protoType.split("/");
+    return (parts.length == 1) ? new AbstractMap.SimpleEntry<>("", parts[0])
+        : new AbstractMap.SimpleEntry<>(parts[0], parts[1]);

Review comment:
       Is that OK to use the first 2 elements if `parts.length > 2`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve Parquet Protobuf Usability
> ----------------------------------
>
>                 Key: PARQUET-1903
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1903
>             Project: Parquet
>          Issue Type: Improvement
>            Reporter: David Mollitor
>            Assignee: David Mollitor
>            Priority: Major
>
> Check out the PR for details.
>  
>  * Move away from passing around a {{Class}} object to take advantage of Java Templating
>  * Make parquet-proto library more usable and straight-forward
>  * Provide test examples
>  * Limited support for protocol buffer schema registry
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)