You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/26 05:24:37 UTC

[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929536014


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeFrom(payload);
+      message = builder.build();
+    } catch (Exception e) {
+      LOGGER.error("Not able to decode protobuf message", e);
+      return destination;
+    }
+    _recordExtractor.extract(message, destination);
+    return destination;
+  }
+
+  private InputStream getDescriptorFileInputStream(String descriptorFilePath)
+      throws IOException {
+    URI descriptorFileURI = URI.create(descriptorFilePath);

Review Comment:
   I thought about it but is it fine to use FS factory in another plugin? It does create a sort of dependency that FS impl should always be loaded before proto plugin can be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org