You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/20 07:41:32 UTC

[GitHub] [druid] bananaaggle opened a new pull request #11018: add protobuf inputformat

bananaaggle opened a new pull request #11018:
URL: https://github.com/apache/druid/pull/11018


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   Because of deprecated of parseSpec, I develop ProtobufInputFormat for new interface, which supports stream ingestion for data encoded by Protobuf.
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-805721219


   @clintropolis Thanks for your review. I didn't know this feature before, it's a good idea to change code for this optimization. And do you have other suggestions about my code? If no more concerns, I'll add unit test and document for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-817267309


   @clintropolis Hi, I add one unit test for serialization/deserialization when use schema registry. As for integration test, should I add it in this PR or other PR after this PR is merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis merged pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis merged pull request #11018:
URL: https://github.com/apache/druid/pull/11018


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11018:
URL: https://github.com/apache/druid/pull/11018#discussion_r603712536



##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterators;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final JSONPathSpec flattenSpec;
+  private final ObjectFlattener<JsonNode> recordFlattener;
+  private final ProtobufBytesDecoder protobufBytesDecoder;
+
+  ProtobufReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      ProtobufBytesDecoder protobufBytesDecoder,
+      JSONPathSpec flattenSpec
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.protobufBytesDecoder = protobufBytesDecoder;
+    this.flattenSpec = flattenSpec;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
+  }
+
+  @Override
+  protected CloseableIterator<DynamicMessage> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
+        ))));

Review comment:
       nit: formatting is sort of funny here
   ```suggestion
       return CloseableIterators.withEmptyBaggage(
          Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
       );
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-803268280


   @clintropolis I reviewed code about Avro for inputFormat and learnt it only supports batch ingestion jobs. Why do we not support stream ingestion jobs? I think it's not very hard to implement it and I'm glad to do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11018:
URL: https://github.com/apache/druid/pull/11018#discussion_r603712536



##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterators;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final JSONPathSpec flattenSpec;
+  private final ObjectFlattener<JsonNode> recordFlattener;
+  private final ProtobufBytesDecoder protobufBytesDecoder;
+
+  ProtobufReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      ProtobufBytesDecoder protobufBytesDecoder,
+      JSONPathSpec flattenSpec
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.protobufBytesDecoder = protobufBytesDecoder;
+    this.flattenSpec = flattenSpec;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
+  }
+
+  @Override
+  protected CloseableIterator<DynamicMessage> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
+        ))));

Review comment:
       nit: formatting is sort of funny here
   ```suggestion
       return CloseableIterators.withEmptyBaggage(
           Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
       );
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-808122650


   >And do you have other suggestions about my code? If no more concerns, I'll add unit test and document for it.
   
   Everything else looked good to me :+1:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-817604685


   >As for integration test, should I add it in this PR or other PR after this PR is merged?
   
   I think it would be fine to do the integration test as a follow-up, since it is a bit more involved than the Avro input format due to there not being an existing integration test for protobuf.
   
   Adding it would be pretty similar to what is already there for Avro and the other formats though, with the main bit being implementing [`EventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java) for the protobuf input format to test so that data can be written to the stream, and adding all the json template resources so that it gets added to the "data format" test group.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-803271847


   >@clintropolis Hi, I create ProtobufInputFormat followed your suggestion. I don't use this interface before, so I'm not very familiar with it. Can you review my code and tell me if this implementation meet requirement or not? If this implementation is correct, I will add more unit tests. By the way, where should I change in document about this feature?
   
   Thanks! I will have a look this weekend. I think https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md is the appropriate place to document the new `InputFormat` (I guess we also forgot to update the protobuf section of this in the last PR, https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#protobuf-parser)
   
   >@clintropolis I reviewed code about Avro for inputFormat and learnt it only supports batch ingestion jobs. Why do we not support stream ingestion jobs? I think it's not very hard to implement it and I'm glad to do that.
   
   :+1: The only reason streaming Avro isn't supported yet is basically the same reason it wasn't done for Protobuf, simply that no one has done the conversion. I think it would be great if you would like to take that on, especially since I think Avro and Protobuf (until this PR) are the only "core" extensions that do not yet support `InputFormat`. It would make ingestion be consistent for native batch and streaming, and be much appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11018:
URL: https://github.com/apache/druid/pull/11018#discussion_r600143986



##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufExtensionsModule.java
##########
@@ -37,7 +37,8 @@
     return Collections.singletonList(
         new SimpleModule("ProtobufInputRowParserModule")
             .registerSubtypes(
-                new NamedType(ProtobufInputRowParser.class, "protobuf")
+                new NamedType(ProtobufInputRowParser.class, "protobuf"),
+                new NamedType(ProtobufInputFormat.class, "protobuf_format")

Review comment:
       I think this could just be `protobuf` the same as the parser name, since they are separate interfaces

##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterators;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ProtobufReader extends IntermediateRowParsingReader<String>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final ObjectFlattener<JsonNode> recordFlattener;
+  private final ProtobufBytesDecoder protobufBytesDecoder;
+
+  ProtobufReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      ProtobufBytesDecoder protobufBytesDecoder,
+      JSONPathSpec flattenSpec
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.protobufBytesDecoder = protobufBytesDecoder;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
+  }
+
+  @Override
+  protected CloseableIterator<String> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(JsonFormat.printer().print(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())))
+        )));

Review comment:
       The `InputRowParser` implementation for protobuf has an optimization that skips the conversion to JSON if a flattenSpec is not defined (see #9999), since the overhead to convert to be able to flatten can slow input processing a fair bit (from the numbers in that PR).
   
   To retain this, it might make sense to make the intermediary format be `ByteBuffer` or `byte[]`, and handle the case of having a `flattenSpec` or not separately. I think these could probably be done within this same class, just make `parseInputRows` behave differently for each situation, and it maybe makes sense to use JSON conversion for the `toMap` method (it is by `InputSourceSampler` for the sampler API).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-810235717


   I've changed document and add a serialization test. What is the integration tests you mentioned above? Can you give me an example for implementing it? @clintropolis 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-803267811


   @clintropolis Hi, I create ProtobufInputFormat followed your suggestion. I don't use this interface before, so I'm not very familiar with it. Can you review my code and tell me if this implementation meet requirement or not? If this implementation is correct, I will add more unit tests. By the way, where should I change in document about this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle edited a comment on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle edited a comment on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-808844718


   @clintropolis I've add document for this feature and supple document for last commit. Can you review those documents and help me refine it? And what else do you think I should add for unit tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-808844718


   @clintropolis I've add document for this feature and supple document for last commit. Can you review those documents and help me refine it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle closed pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle closed pull request #11018:
URL: https://github.com/apache/druid/pull/11018


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-810307041


   This pull request **introduces 2 alerts** when merging 1751a3f1ce1e7fba04d2f014ce5a892c6f8f36be into 6789ed0a05fde97a70d084d54a0cb7f693c7b682 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-8e854054b2fe622df72cb15bc6cbd76e9168083f)
   
   **new alerts:**
   
   * 2 for Inconsistent equals and hashCode


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11018:
URL: https://github.com/apache/druid/pull/11018#issuecomment-805401266


   > > @clintropolis Hi, I create ProtobufInputFormat followed your suggestion. I don't use this interface before, so I'm not very familiar with it. Can you review my code and tell me if this implementation meet requirement or not? If this implementation is correct, I will add more unit tests. By the way, where should I change in document about this feature?
   > 
   > Thanks! I will have a look this weekend. I think https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md is the appropriate place to document the new `InputFormat` (I guess we also forgot to update the protobuf section of this in the last PR, https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#protobuf-parser)
   > 
   > > @clintropolis I reviewed code about Avro for inputFormat and learnt it only supports batch ingestion jobs. Why do we not support stream ingestion jobs? I think it's not very hard to implement it and I'm glad to do that.
   > 
   > 👍 The only reason streaming Avro isn't supported yet is basically the same reason it wasn't done for Protobuf, simply that no one has done the conversion. I think it would be great if you would like to take that on, especially since I think Avro and Protobuf (until this PR) are the only "core" extensions that do not yet support `InputFormat`. It would make ingestion be consistent for native batch and streaming, and be much appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11018: add protobuf inputformat

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11018:
URL: https://github.com/apache/druid/pull/11018#discussion_r603712536



##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterators;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final JSONPathSpec flattenSpec;
+  private final ObjectFlattener<JsonNode> recordFlattener;
+  private final ProtobufBytesDecoder protobufBytesDecoder;
+
+  ProtobufReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      ProtobufBytesDecoder protobufBytesDecoder,
+      JSONPathSpec flattenSpec
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.protobufBytesDecoder = protobufBytesDecoder;
+    this.flattenSpec = flattenSpec;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
+  }
+
+  @Override
+  protected CloseableIterator<DynamicMessage> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
+        ))));

Review comment:
       nit: formatting is sort of funny here
   ```suggestion
       return CloseableIterators.withEmptyBaggage(
         Iterators.singletonIterator(protobufBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
       );
   ```

##########
File path: docs/ingestion/data-formats.md
##########
@@ -272,6 +272,42 @@ The `inputFormat` to load data of Avro OCF format. An example is:
 |schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Protobuf
+
+> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf input format.
+
+The `inputFormat` to load data of Protobuf format. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "protobuf",
+    "protoBytesDecoder": {
+      "type": "file",
+      "descriptor": "file:///tmp/metrics.desc",
+      "protoMessageType": "Metrics"
+    }
+    "flattenSpec": {
+      "useFieldDiscovery": true,
+      "fields": [
+        {
+          "type": "path",
+          "name": "someRecord_subInt",
+          "expr": "$.someRecord.subInt"
+        }
+      ]
+    },
+    "binaryAsString": false
+  },
+  ...
+}
+```
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+|type| String| This should be set to `protobuf` to read Protobuf file| yes |

Review comment:
       Since I guess this is more stream oriented, maybe:
   ```suggestion
   |type| String| This should be set to `protobuf` to read Protobuf serialized data| yes |
   ```
   or "to read Protobuf formatted data" or similar.

##########
File path: docs/ingestion/data-formats.md
##########
@@ -272,6 +272,42 @@ The `inputFormat` to load data of Avro OCF format. An example is:
 |schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Protobuf
+
+> You need to include the [`druid-protobuf-extensions`](../development/extensions-core/protobuf.md) as an extension to use the Protobuf input format.
+
+The `inputFormat` to load data of Protobuf format. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "protobuf",
+    "protoBytesDecoder": {
+      "type": "file",
+      "descriptor": "file:///tmp/metrics.desc",
+      "protoMessageType": "Metrics"
+    }
+    "flattenSpec": {
+      "useFieldDiscovery": true,
+      "fields": [
+        {
+          "type": "path",
+          "name": "someRecord_subInt",
+          "expr": "$.someRecord.subInt"
+        }
+      ]
+    },
+    "binaryAsString": false

Review comment:
       I think this is an Avro/Parquet/ORC parameter

##########
File path: docs/ingestion/data-formats.md
##########
@@ -1104,6 +1142,83 @@ Sample spec:
 See the [extension description](../development/extensions-core/protobuf.md) for
 more details and examples.
 
+#### Protobuf Bytes Decoder
+
+If `type` is not included, the avroBytesDecoder defaults to `schema_registry`.

Review comment:
       ```suggestion
   If `type` is not included, the `protoBytesDecoder` defaults to `schema_registry`.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org