You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/05 03:52:10 UTC

[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #5654: [Part 1] Add geo support

mayankshriv commented on a change in pull request #5654:
URL: https://github.com/apache/incubator-pinot/pull/5654#discussion_r449826482



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/serde/GeometrySerde.java
##########
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.LinearRing;
+import org.locationtech.jts.geom.MultiPoint;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.Polygon;
+import org.locationtech.jts.geom.TopologyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.lang.Double.NaN;
+import static java.lang.Double.isNaN;
+import static java.util.Objects.requireNonNull;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_FACTORY;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_GET_MASK;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_SET_MASK;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_SRID;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOMETRY_FACTORY;
+
+/**
+ * Provides methods to efficiently serialize and deserialize geometry types.
+ */
+public class GeometrySerde extends Serializer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GeometrySerde.class);

Review comment:
       Tab space indicates not following Pinot code-styling.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/serde/GeometrySerde.java
##########
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.LinearRing;
+import org.locationtech.jts.geom.MultiPoint;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.Polygon;
+import org.locationtech.jts.geom.TopologyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.lang.Double.NaN;
+import static java.lang.Double.isNaN;
+import static java.util.Objects.requireNonNull;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_FACTORY;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_GET_MASK;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_SET_MASK;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOGRAPHY_SRID;
+import static org.apache.pinot.core.geospatial.GeometryUtils.GEOMETRY_FACTORY;
+
+/**
+ * Provides methods to efficiently serialize and deserialize geometry types.
+ */
+public class GeometrySerde extends Serializer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GeometrySerde.class);
+
+    @Override
+    public void write(Kryo kryo, Output output, Object object) {
+        if (!(object instanceof Geometry)) {
+            throw new UnsupportedOperationException("Cannot serialize object of type " +
+                    object.getClass().getName());
+        }
+        writeGeometry(output, (Geometry) object);
+    }
+
+    @Override
+    public Object read(Kryo kryo, Input input, Class aClass) {
+        byte typeByte = input.readByte();
+        GeometrySerializationType type = readGeometryType(typeByte);
+        GeometryFactory factory = getGeometryFactory(typeByte);
+
+        return readGeometry(input, type, factory);
+    }
+
+    private Geometry readGeometry(Input input, GeometrySerializationType type, GeometryFactory factory) {
+        switch (type) {
+            case POINT:
+                return readPoint(input, factory);
+            case MULTI_POINT:
+                return readMultiPoint(input, factory);
+            case LINE_STRING:
+                return readPolyline(input, false, factory);
+            case MULTI_LINE_STRING:
+                return readPolyline(input, true, factory);
+            case POLYGON:
+                return readPolygon(input, false, factory);
+            case MULTI_POLYGON:
+                return readPolygon(input, true, factory);
+            case GEOMETRY_COLLECTION:
+                return readGeometryCollection(input, factory);
+            default:
+                throw new UnsupportedOperationException("Unexpected type: " + type);
+        }
+    }
+
+    private Point readPoint(Input input, GeometryFactory factory) {
+        Coordinate coordinates = readCoordinate(input);
+        if (isNaN(coordinates.x) || isNaN(coordinates.y)) {
+            return factory.createPoint();
+        }
+        return factory.createPoint(coordinates);
+    }
+
+    private Coordinate readCoordinate(Input input) {
+        return new Coordinate(input.readDouble(), input.readDouble());
+    }
+
+    private Coordinate[] readCoordinates(Input input, int count) {
+        requireNonNull(input, "input is null");
+        verify(count > 0);
+        Coordinate[] coordinates = new Coordinate[count];
+        for (int i = 0; i < count; i++) {
+            coordinates[i] = readCoordinate(input);
+        }
+        return coordinates;
+    }
+
+    private Geometry readMultiPoint(Input input, GeometryFactory factory) {
+        int pointCount = input.readInt();
+        Point[] points = new Point[pointCount];
+        for (int i = 0; i < pointCount; i++) {
+            points[i] = readPoint(input, factory);
+        }
+        return factory.createMultiPoint(points);
+    }
+
+    private GeometrySerializationType readGeometryType(byte typeByte) {
+        return GeometrySerializationType.fromID(typeByte & GEOGRAPHY_GET_MASK);
+    }
+
+    private Geometry readPolyline(Input input, boolean multitype, GeometryFactory factory) {
+        int partCount = input.readInt();
+        if (partCount == 0) {
+            if (multitype) {
+                return factory.createMultiLineString();
+            }
+            return factory.createLineString();
+        }
+
+        int pointCount = input.readInt();
+        int[] startIndexes = new int[partCount];
+        for (int i = 0; i < partCount; i++) {
+            startIndexes[i] = input.readInt();
+        }
+
+        int[] partLengths = new int[partCount];
+        if (partCount > 1) {
+            partLengths[0] = startIndexes[1];
+            for (int i = 1; i < partCount - 1; i++) {
+                partLengths[i] = startIndexes[i + 1] - startIndexes[i];
+            }
+        }
+        partLengths[partCount - 1] = pointCount - startIndexes[partCount - 1];
+
+        LineString[] lineStrings = new LineString[partCount];
+
+        for (int i = 0; i < partCount; i++) {
+            lineStrings[i] = factory.createLineString(readCoordinates(input, partLengths[i]));
+        }
+
+        if (multitype) {
+            return factory.createMultiLineString(lineStrings);
+        }
+        verify(lineStrings.length == 1);
+        return lineStrings[0];
+    }
+
+    private Geometry readPolygon(Input input, boolean multitype, GeometryFactory factory) {
+        int partCount = input.readInt();
+        if (partCount == 0) {
+            if (multitype) {
+                return factory.createMultiPolygon();
+            }
+            return factory.createPolygon();
+        }
+
+        int pointCount = input.readInt();
+        int[] startIndexes = new int[partCount];
+        for (int i = 0; i < partCount; i++) {
+            startIndexes[i] = input.readInt();
+        }
+
+        int[] partLengths = new int[partCount];
+        if (partCount > 1) {
+            partLengths[0] = startIndexes[1];
+            for (int i = 1; i < partCount - 1; i++) {
+                partLengths[i] = startIndexes[i + 1] - startIndexes[i];
+            }
+        }
+        partLengths[partCount - 1] = pointCount - startIndexes[partCount - 1];
+
+        LinearRing shell = null;
+        List<LinearRing> holes = new ArrayList<>();

Review comment:
       Initialize list with size if known.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ConstructFromTextFunction.java
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.transform.function;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.operator.transform.function.BaseTransformFunction;
+import org.apache.pinot.core.operator.transform.function.TransformFunction;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.ParseException;
+import org.locationtech.jts.io.WKTReader;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class ConstructFromTextFunction extends BaseTransformFunction {
+    private TransformFunction _transformFunction;
+    private byte[][] _results;
+    private WKTReader _reader;
+
+    @Override
+    public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
+        Preconditions
+                .checkArgument(arguments.size() == 1, "Exactly 1 argument is required for transform function: %s",
+                        getName());
+        TransformFunction transformFunction = arguments.get(0);
+        Preconditions.checkArgument(transformFunction.getResultMetadata().isSingleValue(),
+                "The argument must be single-valued for transform function: %s", getName());
+        _transformFunction = transformFunction;
+        _reader = new WKTReader(getGeometryFactory());
+    }
+
+    abstract protected GeometryFactory getGeometryFactory();
+
+    @Override
+    public TransformResultMetadata getResultMetadata() {
+        return BYTES_SV_NO_DICTIONARY_METADATA;
+    }
+
+    @Override
+    public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
+        if (_results == null) {
+            _results = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+        }
+        String[] argumentValues = _transformFunction.transformToStringValuesSV(projectionBlock);
+        int length = projectionBlock.getNumDocs();
+        for (int i = 0; i < length; i++) {
+            try {
+                Geometry geometry = _reader.read(argumentValues[i]);
+                _results[i] = GeometrySerializer.serialize(geometry);
+            } catch (ParseException e) {
+                throw new RuntimeException(String.format("Failed to parse geometry from String %s", argumentValues[i]));

Review comment:
       Utils.rethrow will preserve the original exception.

##########
File path: pinot-core/pom.xml
##########
@@ -159,7 +159,14 @@
       <groupId>com.jayway.jsonpath</groupId>
       <artifactId>json-path</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.locationtech.jts</groupId>

Review comment:
       Is Eclipse license ok to add? So far we have taken Apache/MIT/Gnu.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/serde/GeometrySerializer.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.locationtech.jts.geom.Geometry;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+public class GeometrySerializer {

Review comment:
       Please add Java doc to all classes and their public methods.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/serde/GeometrySerializationType.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import org.apache.pinot.core.geospatial.GeometryType;
+
+public enum GeometrySerializationType
+{
+    POINT(0, GeometryType.POINT),
+    MULTI_POINT(1, GeometryType.MULTI_POINT),
+    LINE_STRING(2, GeometryType.LINE_STRING),
+    MULTI_LINE_STRING(3, GeometryType.MULTI_LINE_STRING),
+    POLYGON(4, GeometryType.POLYGON),
+    MULTI_POLYGON(5, GeometryType.MULTI_POLYGON),
+    GEOMETRY_COLLECTION(6, GeometryType.GEOMETRY_COLLECTION),
+    ENVELOPE(7, GeometryType.POLYGON);
+
+    private final int id;
+    private final GeometryType geometryType;
+
+    GeometrySerializationType(int id, GeometryType geometryType)
+    {
+        this.id = id;
+        this.geometryType = geometryType;
+    }
+
+    public int id()
+    {
+        return id;
+    }
+
+    public GeometryType getGeometryType()
+    {
+        return geometryType;
+    }
+
+    public static GeometrySerializationType fromID(int id)
+    {
+        switch (id) {

Review comment:
       Consider using a Static Map, if this list has a chance to grow.

##########
File path: pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
##########
@@ -37,6 +37,19 @@
     {
       "dataType": "STRING",
       "name": "group_name"
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "group_lat"
+    },
+    {
+      "dataType": "DOUBLE",
+      "name": "group_lon"
+    },
+    {
+      "dataType": "BYTES",
+      "name": "location",
+      "transformFunction": "stPoint(group_lon, group_lat)"

Review comment:
       +1

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/serde/GeometrySerializationType.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import org.apache.pinot.core.geospatial.GeometryType;
+
+public enum GeometrySerializationType
+{
+    POINT(0, GeometryType.POINT),
+    MULTI_POINT(1, GeometryType.MULTI_POINT),
+    LINE_STRING(2, GeometryType.LINE_STRING),
+    MULTI_LINE_STRING(3, GeometryType.MULTI_LINE_STRING),
+    POLYGON(4, GeometryType.POLYGON),
+    MULTI_POLYGON(5, GeometryType.MULTI_POLYGON),
+    GEOMETRY_COLLECTION(6, GeometryType.GEOMETRY_COLLECTION),
+    ENVELOPE(7, GeometryType.POLYGON);
+
+    private final int id;
+    private final GeometryType geometryType;
+
+    GeometrySerializationType(int id, GeometryType geometryType)
+    {
+        this.id = id;

Review comment:
       Please use Pinot code style (name of member variables starts with `_` to avoid qualifying with `this`.

##########
File path: pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
##########
@@ -87,11 +90,14 @@ public void onMessage(String message) {
                   }
 
                   JsonNode group = messageJSON.get("group");
+                  System.out.println(String.format("reading group %s", group.get("group_id")));

Review comment:
       Unsure if LOGGER should be used here?

##########
File path: pinot-tools/src/main/resources/log4j2.xml
##########
@@ -44,7 +44,7 @@
 
   </Appenders>
   <Loggers>
-    <Root level="info" additivity="false">
+    <Root level="warn" additivity="false">

Review comment:
       This seems unrelated to this PR? Would be good to call it out in the description, along with the motivation for the change.

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/geospatial/serde/BenchmarkSerde.java
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.geospatial.serde;
+
+import com.google.common.base.Joiner;
+import com.google.common.io.Resources;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.io.ParseException;
+import org.locationtech.jts.io.WKTReader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import static com.google.common.io.Resources.getResource;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+import static org.apache.pinot.core.geospatial.serde.GeometrySerializer.serialize;
+import static org.apache.pinot.core.geospatial.serde.GeometrySerializer.deserialize;
+
+@State(Scope.Thread)
+@Fork(2)
+@Warmup(iterations = 3, time = 3, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 4, timeUnit = SECONDS)
+@OutputTimeUnit(SECONDS)
+@BenchmarkMode(Throughput)
+public class BenchmarkSerde

Review comment:
       Would be good to add the benchmark results in the PR description.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org