You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/06/08 04:11:09 UTC

[GitHub] [druid] xhl0726 opened a new pull request #9999: Optimize protobuf parsing

xhl0726 opened a new pull request #9999:
URL: https://github.com/apache/druid/pull/9999


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   Fixes #9984 .
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   see #9984 
   In order to improve the performance of protobuf parsing and solve the problem of `JSONPathSpec` mentioned in the end of #9984, we  apply different parsing methods for flatten data and nested data. 
   
   Since Druid only allows defining `ParseSpec` of protobuf data with `JSONParseSpec`, the nested data refers to the data whose `JSONPathSpec `(defined in `JSONParseSpec`) is not null. The flatten data has null in `JSONPathSpec`, so its parsing process can be optimized by avoding transforming to JSON first. The modified class `ProtobufInputRowParser` has passed all the tests in `ProtobufInputRowParserTest` (If necessary, we can add a unit test for flatten data). 
   
   Below is the result of `ProtobufParserBenchmark`
   ![image](https://user-images.githubusercontent.com/24449727/83990429-32b54200-a97c-11ea-87f3-b9faf205c2d2.png)
   
   It shows that parsing flatten data can be 4 times faster than parsing data using the optimized method. 
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   #### Added test files 
   We added `prototest.desc`(which is a copy of `prototest.desc` in protobuf-extension test-classes)  and  `ProtoFile`(which is generated by a test in  `ProtobuInputRowTest`) under `benchmarks/src/test/resources` to simplify the process of test data preparation. The inputs for two benchmarks in `ProtobufParserBenchmark` are the same, but the results are a little different due to the request of `JSONPathSpec`. 
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
   - [ ] added unit tests or modified existing tests to cover new code paths.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `ProtobufParserBenchmark`
    * `ProtobufInputRowParser`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r440221061



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);

Review comment:
       Yeah your words make sense. I would put the wrapping function in the setup phase. It would make the benchmark easier to understand and be clearer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r444796917



##########
File path: extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java
##########
@@ -177,6 +191,45 @@ public void testParse() throws Exception
     Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
   }
 
+  @Test
+  public void testParseFlattenData() throws Exception

Review comment:
       Your suggestion makes sense. I'll fix it soon. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r440230368



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());

Review comment:
       Yes it can be removed. Thanks for pointing it out~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r440216201



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
+      blackhole.consume(row);
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeNestedData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = nestedParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
+      blackhole.consume(row);
+    }
+
+  }
+  private byte[] getProtoInputs(String fileName)
+  {
+    String filePath = this.getClass().getClassLoader().getResource(fileName).getPath();
+    byte[] bytes = null;
+    try {
+      File file = new File(filePath);
+      bytes = new byte[(int) file.length()];
+      bytes = Files.toByteArray(file);
+    }
+    catch (FileNotFoundException e) {
+      log.error("Cannot find the file: " + filePath);
+      e.printStackTrace();
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+    }
+    return bytes;
+  }
+
+  public static void main(String[] args) throws RunnerException
+  {
+    Options opt = new OptionsBuilder()
+        .include(ProtobufParserBenchmark.class.getSimpleName())
+        .build();
+    new Runner(opt).run();

Review comment:
       Yes you're right~ It is just for debugging and should be removed. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r444750976



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);

Review comment:
       I see. It makes sense. It could be solved by `buff.pos(0)`, but since `ByteBuffer.wrap()` only instantiate a single object with O(1) complexity, I don't think it would make much of a difference.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r444792520



##########
File path: extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java
##########
@@ -76,6 +77,19 @@ public void setUp()
         null
     );
 
+    flattenParseSpec = new JSONParseSpec(

Review comment:
       yeah you're right. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r444740588



##########
File path: extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java
##########
@@ -177,6 +191,45 @@ public void testParse() throws Exception
     Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
   }
 
+  @Test
+  public void testParseFlattenData() throws Exception
+  {
+    //configure parser with desc file
+    ProtobufInputRowParser parser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
+            .setDescription("description")
+            .setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
+            .setId(4711L)
+            .setIsValid(true)
+            .setSomeOtherId(4712)
+            .setTimestamp(dateTime.toString())
+            .setSomeFloatColumn(47.11F)
+            .setSomeIntColumn(815)
+            .setSomeLongColumn(816L)
+            .build();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    event.writeTo(out);
+
+    InputRow row = parser.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
+    System.out.println(row);

Review comment:
       nit: unnecessary println, though there are others in this test file prior to this change so no worries

##########
File path: extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java
##########
@@ -76,6 +77,19 @@ public void setUp()
         null
     );
 
+    flattenParseSpec = new JSONParseSpec(

Review comment:
       nit: should this be named `flatParseSpec` since it is for flat data and _will not_ flatten the data since it has a `null` flattenSpec?

##########
File path: extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java
##########
@@ -177,6 +191,45 @@ public void testParse() throws Exception
     Assert.assertEquals(816.0F, row.getMetric("someLongColumn").floatValue(), 0.0);
   }
 
+  @Test
+  public void testParseFlattenData() throws Exception

Review comment:
       nit: same suggestion about naming, should this be `testParseFlatData` since it isn't actively flattening nested data?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r439824062



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
+      blackhole.consume(row);
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeNestedData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = nestedParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);
+      blackhole.consume(row);
+    }
+
+  }
+  private byte[] getProtoInputs(String fileName)
+  {
+    String filePath = this.getClass().getClassLoader().getResource(fileName).getPath();
+    byte[] bytes = null;
+    try {
+      File file = new File(filePath);
+      bytes = new byte[(int) file.length()];
+      bytes = Files.toByteArray(file);
+    }
+    catch (FileNotFoundException e) {
+      log.error("Cannot find the file: " + filePath);
+      e.printStackTrace();
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+    }
+    return bytes;
+  }
+
+  public static void main(String[] args) throws RunnerException
+  {
+    Options opt = new OptionsBuilder()
+        .include(ProtobufParserBenchmark.class.getSimpleName())
+        .build();
+    new Runner(opt).run();

Review comment:
       I don't think the benchmark class should have a main method. It seems useful for debugging, but I don't think it should exist in the master branch.

##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);

Review comment:
       I wonder what is the effect of `ByteBuffer.wrap()` on the measured performance.
   Is there a reason the wrapping is done inside the loop and not in the setup phase?

##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());

Review comment:
       Is this logging necessary? It seems useful for debugging, but I don't think it should exist in the master branch.

##########
File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
##########
@@ -100,16 +102,27 @@ void initDescriptor()
       parser = parseSpec.makeParser();
       initDescriptor();
     }
-    String json;
-    try {
-      DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
-      json = JsonFormat.printer().print(message);
-    }
-    catch (InvalidProtocolBufferException e) {
-      throw new ParseException(e, "Protobuf message could not be parsed");
+    Map<String, Object> record;
+
+    if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) {
+      try {
+        DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
+        record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName());
+      }
+      catch (InvalidProtocolBufferException ex) {
+        throw new ParseException(ex, "Protobuf message could not be parsed");
+      }
+    } else {
+      try {
+        DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
+        String json = JsonFormat.printer().print(message);
+        record = parser.parseToMap(json);
+      }
+      catch (InvalidProtocolBufferException e) {
+        throw new ParseException(e, "Protobuf message could not be parsed");
+      }

Review comment:
       I suggest refactoring these modifications to reduce code duplications.
   
   ```suggestion
       try {
         DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
       }
       catch (InvalidProtocolBufferException ex) {
         throw new ParseException(ex, "Protobuf message could not be parsed");
       }
   
       Map<String, Object> record;
       if (parseSpec instanceof JSONParseSpec && ((JSONParseSpec) parseSpec).getFlattenSpec().getFields().isEmpty()) {
         record = CollectionUtils.mapKeys(message.getAllFields(), k -> k.getJsonName());
       } else {
         String json = JsonFormat.printer().print(message);
         record = parser.parseToMap(json);
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xhl0726 commented on a change in pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
xhl0726 commented on a change in pull request #9999:
URL: https://github.com/apache/druid/pull/9999#discussion_r440221061



##########
File path: benchmarks/src/test/java/org/apache/druid/benchmark/ProtobufParserBenchmark.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class ProtobufParserBenchmark
+{
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  private static final Logger log = new Logger(ProtobufParserBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private ParseSpec nestedParseSpec;
+  private ProtobufInputRowParser nestedParser;
+  private ParseSpec flattenParseSpec;
+  private ProtobufInputRowParser flattenParser;
+  private byte[] protoInputs;
+  private String protoFilePath;
+
+  @Setup
+  public void setup()
+  {
+    log.info("SETUP CALLED AT " + +System.currentTimeMillis());
+
+    nestedParseSpec = new JSONParseSpec(
+                new TimestampSpec("timestamp", "iso", null),
+                new DimensionsSpec(Lists.newArrayList(
+                        new StringDimensionSchema("event"),
+                        new StringDimensionSchema("id"),
+                        new StringDimensionSchema("someOtherId"),
+                        new StringDimensionSchema("isValid")
+                ), null, null),
+                new JSONPathSpec(
+                        true,
+                        Lists.newArrayList(
+                                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
+                                new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
+                        )
+                ),
+                null,
+                null
+    );
+
+    flattenParseSpec = new JSONParseSpec(
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid")
+            ), null, null),
+            null,
+            null,
+            null
+    );
+
+    protoFilePath = "ProtoFile";
+    protoInputs = getProtoInputs(protoFilePath);
+    nestedParser = new ProtobufInputRowParser(nestedParseSpec, "prototest.desc", "ProtoTestEvent");
+    flattenParser = new ProtobufInputRowParser(flattenParseSpec, "prototest.desc", "ProtoTestEvent");
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void consumeFlattenData(Blackhole blackhole)
+  {
+    for (int i = 0; i < rowsPerSegment; i++) {
+      InputRow row = flattenParser.parseBatch(ByteBuffer.wrap(protoInputs)).get(0);

Review comment:
       Yeah your suggestion would make the loop clearer.  However, when I put the `ByteBuffer.wrap()` out of the loop, some parsing error occured. Since there is only one line in the input file ( I used the loop to monitor read many lines), the buffer would be empty after reading once. In another word, reading from a bytes.Buffer consumes the bytes that were read. This means if you try to read again, those will not be returned. When i = 0, it works. When i>0, it would report a parsing error due to the `pos` of the ByteBuffer allocated has been moved to the end. That's the reason why I put it in the loop. If you have any better solution to that, just tell me to make the loop easier to understand. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis merged pull request #9999: Optimize protobuf parsing for flatten data

Posted by GitBox <gi...@apache.org>.
clintropolis merged pull request #9999:
URL: https://github.com/apache/druid/pull/9999


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org