You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/06/24 06:00:21 UTC

[GitHub] [pinot] KKcorps opened a new pull request, #8972: Add protocol buffer Stream Decoder

KKcorps opened a new pull request, #8972:
URL: https://github.com/apache/pinot/pull/8972

   Allows users to ingest protocol buffers in realtime streams.
   
   Pending -
   * Descriptor file is needed to parse the input. Mechanism needed to make this file available on all servers.
   * Support for schema registry


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929537497


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   Yes. So basically `.proto` file contains the schema. From this schema, you can either generate java classes or you can generate `.desc` binary file.  The latter approach allows us to parse any proto message easily without relying on java impl. It is a bit slower though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8972: Add protocol buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8972:
URL: https://github.com/apache/pinot/pull/8972#issuecomment-1165267539

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8972](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (96bb773) into [master](https://codecov.io/gh/apache/pinot/commit/e8f9d88fc3ab9ba8ca9a4e0c2dfce49c0dbbfd7b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e8f9d88) will **decrease** coverage by `2.30%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8972      +/-   ##
   ============================================
   - Coverage     68.56%   66.26%   -2.31%     
   - Complexity     4640     4695      +55     
   ============================================
     Files          1741     1361     -380     
     Lines         91475    68700   -22775     
     Branches      13674    10746    -2928     
   ============================================
   - Hits          62724    45525   -17199     
   + Misses        24363    19896    -4467     
   + Partials       4388     3279    -1109     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | unittests1 | `66.26% <ø> (+0.03%)` | :arrow_up: |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pinot/core/routing/RoutingTable.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yb3V0aW5nL1JvdXRpbmdUYWJsZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/common/config/NettyConfig.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL05ldHR5Q29uZmlnLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/pinot/server/access/AccessControlFactory.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL2FjY2Vzcy9BY2Nlc3NDb250cm9sRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...he/pinot/common/messages/TableDeletionMessage.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvVGFibGVEZWxldGlvbk1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pinot/core/data/manager/realtime/TimerService.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvVGltZXJTZXJ2aWNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...not/common/exception/HttpErrorStatusException.java](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL0h0dHBFcnJvclN0YXR1c0V4Y2VwdGlvbi5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [829 more](https://codecov.io/gh/apache/pinot/pull/8972/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [e8f9d88...96bb773](https://codecov.io/gh/apache/pinot/pull/8972?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r931249570


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   The slowness is not due to the decoding of .desc file.
   It is due to the fact we are using `DynamicMessage` class instead of compiled Proto java class to deserialize the messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929536014


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeFrom(payload);
+      message = builder.build();
+    } catch (Exception e) {
+      LOGGER.error("Not able to decode protobuf message", e);
+      return destination;
+    }
+    _recordExtractor.extract(message, destination);
+    return destination;
+  }
+
+  private InputStream getDescriptorFileInputStream(String descriptorFilePath)
+      throws IOException {
+    URI descriptorFileURI = URI.create(descriptorFilePath);

Review Comment:
   I thought about it but is it fine to use FS factory in another plugin? It does create a sort of dependency that FS impl should always be loaded before proto plugin can be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r931181275


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   the slowness should only be a one-time overhead right? we do not decode `desc` files on every message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r930924252


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = ProtoBufUtils.getDescriptorFileInputStream(
+        props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();

Review Comment:
   This builder should be reused to save memory allocation. You can call clear() to reset it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged PR #8972:
URL: https://github.com/apache/pinot/pull/8972


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929555468


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeFrom(payload);
+      message = builder.build();
+    } catch (Exception e) {
+      LOGGER.error("Not able to decode protobuf message", e);
+      return destination;
+    }
+    _recordExtractor.extract(message, destination);
+    return destination;
+  }
+
+  private InputStream getDescriptorFileInputStream(String descriptorFilePath)
+      throws IOException {
+    URI descriptorFileURI = URI.create(descriptorFilePath);

Review Comment:
   yes, just spi dependency should be fine.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r931292841


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   I did some benchmark on this few years back - https://codeburst.io/using-dynamic-messages-in-protocol-buffers-in-scala-9fda4f0efcb3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on PR #8972:
URL: https://github.com/apache/pinot/pull/8972#issuecomment-1193959409

   @xiangfu0 @navina can you please review this? We need to merge this before 0.11 release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r930874602


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeFrom(payload);
+      message = builder.build();
+    } catch (Exception e) {
+      LOGGER.error("Not able to decode protobuf message", e);
+      return destination;
+    }
+    _recordExtractor.extract(message, destination);
+    return destination;
+  }
+
+  private InputStream getDescriptorFileInputStream(String descriptorFilePath)
+      throws IOException {
+    URI descriptorFileURI = URI.create(descriptorFilePath);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929311591


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),
+        "Protocol Buffer schema descriptor file must be provided");
+
+    _protoClassName = props.getOrDefault(PROTO_CLASS_NAME, "");
+    InputStream descriptorFileInputStream = getDescriptorFileInputStream(props.get(DESCRIPTOR_FILE_PATH));
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(descriptorFileInputStream);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DynamicSchema dynamicSchema = DynamicSchema.parseFrom(fin);
+
+      if (!StringUtils.isEmpty(_protoClassName)) {
+        return dynamicSchema.getMessageDescriptor(_protoClassName);
+      } else {
+        return dynamicSchema.getMessageDescriptor(dynamicSchema.getMessageTypes().toArray(new String[]{})[0]);
+      }
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    Message message;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeFrom(payload);
+      message = builder.build();
+    } catch (Exception e) {
+      LOGGER.error("Not able to decode protobuf message", e);
+      return destination;
+    }
+    _recordExtractor.extract(message, destination);
+    return destination;
+  }
+
+  private InputStream getDescriptorFileInputStream(String descriptorFilePath)
+      throws IOException {
+    URI descriptorFileURI = URI.create(descriptorFilePath);

Review Comment:
   Shall we use PinotFS to copy the file to local and then load it?
   You mentioned that this file should be distributed across all servers. so technically the way to distribute it is:
   1. load from deep store/remote
   2. in K8s, mount file as a configMap 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r929396480


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   I am not familiar with what a descriptor file represents after reading https://github.com/os72/protobuf-dynamic and viewing `sample.desc` . Is that a binary file that is generated by proto compiler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8972: Add Protocol Buffer Stream Decoder

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8972:
URL: https://github.com/apache/pinot/pull/8972#discussion_r931337572


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufMessageDecoder.java:
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.github.os72.protobuf.dynamic.DynamicSchema;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.ResourceFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+//TODO: Add support for Schema Registry
+//TODO: Making descriptor file accessible on all servers
+public class ProtoBufMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ProtoBufMessageDecoder.class);
+
+  public static final String DESCRIPTOR_FILE_PATH = "descriptorFile";
+  public static final String PROTO_CLASS_NAME = "protoClassName";
+
+  private DynamicMessage _dynamicMessage;
+  private ProtoBufRecordExtractor _recordExtractor;
+  private String _protoClassName;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName)
+      throws Exception {
+    Preconditions.checkState(props.containsKey(DESCRIPTOR_FILE_PATH),

Review Comment:
   These are on my machine with proto version we are using. Significant improvments from ones in blog post.
   ```
   Benchmark                                             (_numRecords)  Mode  Cnt   Score   Error  Units
   BenchmarkDynamicMessage.compiledClassDeserialization         100000  avgt    5  12.787 ± 0.297  ms/op
   BenchmarkDynamicMessage.dynamicClassDeserialization          100000  avgt    5  28.150 ± 0.691  ms/op
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org