You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2020/05/31 04:56:15 UTC

[incubator-pinot] branch master updated: Support distinctCountRawThetaSketch aggregation that returns serialized sketch. (#5465)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 01a316e  Support distinctCountRawThetaSketch aggregation that returns serialized sketch. (#5465)
01a316e is described below

commit 01a316ef16001786acddefa1d4334d0eb1b86689
Author: Mayank Shrivastava <ms...@linkedin.com>
AuthorDate: Sat May 30 21:56:06 2020 -0700

    Support distinctCountRawThetaSketch aggregation that returns serialized sketch. (#5465)
    
    1. Support a variation of theta sketch based distinct count aggregation function that returns
       serialized bytes of the final aggregated sketch, instead of the actual distinct value.
    
    2. The return value is hex encoded String of the serialized sketch bytes. This can be
       deserialized at the client side by the library using org.apache.commons.codec.binary as:
       `Hex.decodeHex(stringValue.toCharArray())`. This is the same as any other byte[] value
       returned by Pinot.
    
    3. Added unit test for the new aggregation function.
---
 .../common/function/AggregationFunctionType.java   |   1 +
 .../function/AggregationFunctionFactory.java       |   2 +
 ...inctCountRawThetaSketchAggregationFunction.java | 140 +++++++++++++++++++++
 ...istinctCountThetaSketchAggregationFunction.java |  16 ++-
 .../queries/DistinctCountThetaSketchTest.java      | 115 +++++++++++------
 5 files changed, 235 insertions(+), 39 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index af31639..ff3fb50 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -31,6 +31,7 @@ public enum AggregationFunctionType {
   DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
   FASTHLL("fastHLL"),
   DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch"),
+  DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch"),
   PERCENTILE("percentile"),
   PERCENTILEEST("percentileEst"),
   PERCENTILETDIGEST("percentileTDigest"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index f21f7fb..496b50e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -137,6 +137,8 @@ public class AggregationFunctionFactory {
             return new FastHLLAggregationFunction(column);
           case DISTINCTCOUNTTHETASKETCH:
             return new DistinctCountThetaSketchAggregationFunction(arguments);
+          case DISTINCTCOUNTRAWTHETASKETCH:
+            return new DistinctCountRawThetaSketchAggregationFunction(arguments);
           case COUNTMV:
             return new CountMVAggregationFunction(column);
           case MINMV:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
new file mode 100644
index 0000000..a620c00
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.spi.utils.ByteArray;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.BYTES;
+
+
+/**
+ * A variation of the {@link DistinctCountThetaSketchAggregationFunction} that returns the serialized bytes
+ * of the theta-sketch, as opposed to the actual distinct value.
+ *
+ * Note: It would have been natural for this class to extend the {@link DistinctCountThetaSketchAggregationFunction},
+ * except that the return type for this class is a String, as opposed to Integer for the former, due to which the
+ * extension is not possible.
+ */
+public class DistinctCountRawThetaSketchAggregationFunction implements AggregationFunction<Map<String, Sketch>, ByteArray> {
+  private final DistinctCountThetaSketchAggregationFunction _thetaSketchAggregationFunction;
+
+  public DistinctCountRawThetaSketchAggregationFunction(List<String> arguments)
+      throws SqlParseException {
+    _thetaSketchAggregationFunction = new DistinctCountThetaSketchAggregationFunction(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH;
+  }
+
+  @Override
+  public String getColumnName() {
+    return _thetaSketchAggregationFunction.getColumnName();
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return _thetaSketchAggregationFunction.getResultColumnName();
+  }
+
+  @Override
+  public List<TransformExpressionTree> getInputExpressions() {
+    return _thetaSketchAggregationFunction.getInputExpressions();
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    _thetaSketchAggregationFunction.accept(visitor);
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return _thetaSketchAggregationFunction.createAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return _thetaSketchAggregationFunction.createGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
+    _thetaSketchAggregationFunction.aggregate(length, aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
+    _thetaSketchAggregationFunction.aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<TransformExpressionTree, BlockValSet> blockValSetMap) {
+    _thetaSketchAggregationFunction.aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public Map<String, Sketch> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return _thetaSketchAggregationFunction.extractAggregationResult(aggregationResultHolder);
+  }
+
+  @Override
+  public Map<String, Sketch> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return _thetaSketchAggregationFunction.extractGroupByResult(groupByResultHolder, groupKey);
+  }
+
+  @Override
+  public Map<String, Sketch> merge(Map<String, Sketch> intermediateResult1, Map<String, Sketch> intermediateResult2) {
+    return _thetaSketchAggregationFunction.merge(intermediateResult1, intermediateResult2);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return _thetaSketchAggregationFunction.isIntermediateResultComparable();
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return _thetaSketchAggregationFunction.getIntermediateResultColumnType();
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return BYTES;
+  }
+
+  @Override
+  public ByteArray extractFinalResult(Map<String, Sketch> intermediateResult) {
+    Sketch sketch = _thetaSketchAggregationFunction.extractFinalSketch(intermediateResult);
+    return new ByteArray(sketch.compact().toByteArray());
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index be8dc2e..d6161fa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -619,10 +619,20 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
   }
 
   /**
-   * Returns the theta-sketch SetOperation builder properly configured.
-   * Currently, only setting of nominalEntries is supported.
-   * @return SetOperationBuilder
+   * Extracts the final sketch from the intermediate result by applying the postAggregation expression on it.
+   *
+   * @param intermediateResult Intermediate result
+   * @return Final Sketch obtained by computing the post aggregation expression on intermediate result
    */
+  protected Sketch extractFinalSketch(Map<String, Sketch> intermediateResult) {
+    return evalPostAggregationExpression(_postAggregationExpression, intermediateResult);
+  }
+
+    /**
+     * Returns the theta-sketch SetOperation builder properly configured.
+     * Currently, only setting of nominalEntries is supported.
+     * @return SetOperationBuilder
+     */
   private SetOperationBuilder getSetOperationBuilder() {
     return (_thetaSketchParams == null) ? SetOperation.builder()
         : SetOperation.builder().setNominalEntries(_thetaSketchParams.getNominalEntries());
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 2ea387c..6b805df 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Random;
 import joptsimple.internal.Strings;
 import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.UpdateSketch;
 import org.apache.datasketches.theta.UpdateSketchBuilder;
 import org.apache.pinot.common.function.AggregationFunctionType;
@@ -50,6 +52,7 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -119,38 +122,63 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
   }
 
   private void testThetaSketches(boolean groupBy, boolean sql) {
-    List<String> predicates = Collections.singletonList("colA = 1");
-    String whereClause = Strings.join(predicates, " or ");
-    testQuery(whereClause, null, predicates, whereClause, groupBy, sql);
+    String tsQuery, distinctQuery;
+    String thetaSketchParams = "nominalEntries=1001";
+
+    List<String> predicateStrings = Collections.singletonList("colA = 1");
+    String whereClause = Strings.join(predicateStrings, " or ");
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, false);
+    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, true);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
 
     // Test Intersection (AND)
-    predicates = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
-    whereClause = Strings.join(predicates, " and ");
-    testQuery(whereClause, "nominalEntries=1001", predicates, whereClause, groupBy, sql);
+    predicateStrings = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
+    whereClause = Strings.join(predicateStrings, " and ");
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, false);
+    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, true);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
 
     // Test Union (OR)
-    predicates = Arrays.asList("colA = 1", "colB = 1.9");
-    whereClause = Strings.join(predicates, " or ");
-    testQuery(whereClause, " nominalEntries =1001 ", predicates, whereClause, groupBy, sql);
+    predicateStrings = Arrays.asList("colA = 1", "colB = 1.9");
+    whereClause = Strings.join(predicateStrings, " or ");
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, false);
+    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, true);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
 
     // Test complex predicates
-    predicates =
-        Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
+    predicateStrings = Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
     whereClause =
-        predicates.get(0) + " and " + predicates.get(1) + " or " + predicates.get(0) + " and " + predicates.get(2);
-    testQuery(whereClause, "nominalEntries =  1001", predicates, whereClause, groupBy, sql);
+        predicateStrings.get(0) + " and " + predicateStrings.get(1) + " or " + predicateStrings.get(0) + " and "
+            + predicateStrings.get(2);
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, false);
+    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, true);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
 
     // Test without predicate arguments
     whereClause =
-        predicates.get(0) + " and " + predicates.get(1) + " or " + predicates.get(0) + " and " + predicates.get(2);
-    testQuery(whereClause, "nominalEntries =  1001", Collections.emptyList(), whereClause, groupBy, sql);
+        predicateStrings.get(0) + " and " + predicateStrings.get(1) + " or " + predicateStrings.get(0) + " and "
+            + predicateStrings.get(2);
+    tsQuery = buildQuery(whereClause, thetaSketchParams, Collections.emptyList(), whereClause, groupBy, false);
+    distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+
+    tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, whereClause, groupBy, true);
+    testQuery(tsQuery, distinctQuery, groupBy, sql, true);
   }
 
-  private void testQuery(String whereClause, String thetaSketchParams, List<String> predicateStrings,
-      String postAggregationExpression, boolean groupBy, boolean sql) {
-    String tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, postAggregationExpression, groupBy);
-    String distinctQuery = buildQuery(whereClause, null, null, null, groupBy);
-
+  private void testQuery(String tsQuery, String distinctQuery, boolean groupBy, boolean sql, boolean raw) {
     Map<String, String> queryOptions = Collections.emptyMap();
     BrokerResponseNative actualResponse =
         (sql) ? getBrokerResponseForSqlQuery(tsQuery) : getBrokerResponseForPqlQuery(tsQuery, queryOptions);
@@ -159,33 +187,35 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
         (sql) ? getBrokerResponseForSqlQuery(distinctQuery) : getBrokerResponseForPqlQuery(distinctQuery, queryOptions);
 
     if (groupBy) {
-      compareGroupBy(actualResponse, expectedResponse, sql);
+      compareGroupBy(actualResponse, expectedResponse, sql, raw);
     } else {
-      compareAggregation(actualResponse, expectedResponse, sql);
+      compareAggregation(actualResponse, expectedResponse, sql, raw);
     }
   }
 
   private void compareAggregation(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
-      boolean sql) {
+      boolean sql, boolean raw) {
     if (sql) {
-      compareSql(actualResponse, expectedResponse);
+      compareSql(actualResponse, expectedResponse, raw);
     } else {
-      compareAggregationPql(actualResponse, expectedResponse);
+      compareAggregationPql(actualResponse, expectedResponse, raw);
     }
   }
 
-  private void compareGroupBy(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse, boolean sql) {
+  private void compareGroupBy(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse, boolean sql,
+      boolean raw) {
     if (sql) {
-      compareSql(actualResponse, expectedResponse);
+      compareSql(actualResponse, expectedResponse, raw);
     } else {
-      compareGroupByPql(actualResponse, expectedResponse);
+      compareGroupByPql(actualResponse, expectedResponse, raw);
     }
   }
 
-  private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse) {
+  private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+      boolean raw) {
     List<AggregationResult> actualResults = actualResponse.getAggregationResults();
     Assert.assertEquals(actualResults.size(), 1);
-    double actual = Double.parseDouble((String) actualResults.get(0).getValue());
+    double actual = getSketchValue((String) actualResults.get(0).getValue(), raw);
 
     List<AggregationResult> expectedResults = expectedResponse.getAggregationResults();
     double expected = Double.parseDouble((String) expectedResults.get(0).getValue());
@@ -194,18 +224,21 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
         "Distinct count mismatch: actual: " + actual + "expected: " + expected + "seed:" + RANDOM_SEED);
   }
 
-  private void compareSql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse) {
+  private void compareSql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse, boolean raw) {
     List<Object[]> actualRows = actualResponse.getResultTable().getRows();
     List<Object[]> expectedRows = expectedResponse.getResultTable().getRows();
 
     Assert.assertEquals(actualRows.size(), expectedRows.size());
 
     for (int i = 0; i < actualRows.size(); i++) {
-      Assert.assertEquals(actualRows.get(i), expectedRows.get(i));
+      double actual = getSketchValue(actualRows.get(i)[0].toString(), raw);
+      double expected = (Integer) expectedRows.get(i)[0];
+      Assert.assertEquals(actual, expected);
     }
   }
 
-  private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse) {
+  private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+      boolean raw) {
     AggregationResult actualResult = actualResponse.getAggregationResults().get(0);
     List<GroupByResult> actualGroupBy = actualResult.getGroupByResult();
 
@@ -214,7 +247,7 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
 
     Assert.assertEquals(actualGroupBy.size(), expectedGroupBy.size());
     for (int i = 0; i < actualGroupBy.size(); i++) {
-      double actual = Double.parseDouble((String) actualGroupBy.get(i).getValue());
+      double actual = getSketchValue((String) actualGroupBy.get(i).getValue(), raw);
       double expected = Double.parseDouble((String) expectedGroupBy.get(i).getValue());
 
       Assert.assertEquals(actual, expected, (expected * 0.1), // Allow for 10 % error.
@@ -222,14 +255,24 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
     }
   }
 
+  private double getSketchValue(String value, boolean raw) {
+    if (!raw) {
+      return Double.parseDouble(value);
+    }
+
+    byte[] bytes = BytesUtils.toBytes(value);
+    return Sketch.wrap(Memory.wrap(bytes)).getEstimate();
+  }
+
   private String buildQuery(String whereClause, String thetaSketchParams, List<String> thetaSketchPredicates,
-      String postAggregationExpression, boolean groupBy) {
+      String postAggregationExpression, boolean groupBy, boolean raw) {
     String column;
     String aggrFunction;
     boolean thetaSketch = (postAggregationExpression != null);
 
     if (thetaSketch) {
-      aggrFunction = AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName();
+      aggrFunction = (raw) ? AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH.getName()
+          : AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName();
       column = THETA_SKETCH_COLUMN;
     } else {
       aggrFunction = AggregationFunctionType.DISTINCTCOUNT.getName();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org