You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/22 19:13:06 UTC

[GitHub] [incubator-pinot] yupeng9 commented on a change in pull request #6376: Geo Indexing (WIP)

yupeng9 commented on a change in pull request #6376:
URL: https://github.com/apache/incubator-pinot/pull/6376#discussion_r547427987



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ScalarFunctions.java
##########
@@ -72,4 +73,17 @@ public static String stAsText(byte[] bytes) {
     WKTWriter writer = new WKTWriter();
     return writer.write(GeometrySerializer.deserialize(bytes));
   }
+
+  /**
+   * Saves the geometry object as WKT format.

Review comment:
       fix comment.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class H3IndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _h3IndexColumns = new HashSet<>();
+
+  public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+      SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Only create H3 index on non-dictionary-encoded columns

Review comment:
       This actually brings up an interesting question. If user wants to store the point with column lat, lng as double, is it possible to create an index for them?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -325,10 +326,10 @@ public long getLatestIngestionTimestamp() {
       // Null value vector
       MutableNullValueVector nullValueVector = _nullHandlingEnabled ? new MutableNullValueVector() : null;
 
-      // TODO: Support range index and bloom filter for mutable segment
+      // TODO: Support range index and bloom filter and h3 index for mutable segment
       _indexContainerMap.put(column,
           new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary,
-              invertedIndexReader, null, textIndex, null, nullValueVector));
+              invertedIndexReader, null, null, textIndex, null, nullValueVector));

Review comment:
       shall we consider a builder class so we don't have to change the constructor in the future?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.geospatial.transform.function.StPointFunction;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.GeoPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  // NOTE: Range index can only apply to dictionary-encoded columns for now
+  // TODO: Support raw index columns
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final H3IndexReader _h3IndexReader;
+  private Geometry _geometry;
+  private double _distance;
+
+  public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, int numDocs) {
+    FunctionContext function = predicate.getLhs().getFunction();
+    String columnName;
+
+    if (function.getArguments().get(0).getType() == ExpressionContext.Type.IDENTIFIER) {

Review comment:
       can you describe the condition shape supported? For example, will this handle the composite expression?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
##########
@@ -120,6 +129,20 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter,
         Predicate predicate = filter.getPredicate();
         ExpressionContext lhs = predicate.getLhs();
         if (lhs.getType() == ExpressionContext.Type.FUNCTION) {
+          FunctionContext function = lhs.getFunction();
+
+          boolean canApplyH3Index = false;
+          if (function.getFunctionName().equalsIgnoreCase(StDistanceFunction.FUNCTION_NAME)) {
+            String columnName = function.getArguments().get(0).getIdentifier();

Review comment:
       Does this assume the column must be one the left of the expression? Will the parser do some expression rewriting?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/geospatial/transform/function/ScalarFunctions.java
##########
@@ -72,4 +73,17 @@ public static String stAsText(byte[] bytes) {
     WKTWriter writer = new WKTWriter();
     return writer.write(GeometrySerializer.deserialize(bytes));
   }
+
+  /**
+   * Saves the geometry object as WKT format.
+   *
+   * @param bytes the serialized geometry object
+   * @return the geometry in WKT
+   */
+  @ScalarFunction
+  public static byte[] toSphericalGeography(byte[] bytes) {

Review comment:
       for symmetry, good to have `toGeometry` function

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/request/context/predicate/GeoPredicate.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.request.context.predicate;
+
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.locationtech.jts.geom.Geometry;
+
+
+//TODO: Make this flexible
+public class GeoPredicate {
+
+  //this is the column name
+  ExpressionContext _lhs;
+
+  Predicate type;

Review comment:
       _type

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers.geospatial;
+
+import java.io.Closeable;
+import java.lang.ref.SoftReference;
+import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.IntDictionary;
+import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class H3IndexReader implements Closeable {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class);
+
+  private final PinotDataBuffer _bitmapBuffer;
+  private final PinotDataBuffer _offsetBuffer;
+  private final int _numBitmaps;
+  private final int _bitmapBufferSize;
+
+  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
+
+  private Dictionary _dictionary;
+
+  /**
+   * Constructs an inverted index with the specified size.
+   * @param dataBuffer data buffer for the inverted index.
+   */
+  public H3IndexReader(PinotDataBuffer dataBuffer) {
+    int version = dataBuffer.getInt(0 * Integer.BYTES);
+    _numBitmaps = dataBuffer.getInt(1 * Integer.BYTES);
+
+    int headerSize = 2 * Integer.BYTES;
+    //read the dictionary
+    int dictionarySize = _numBitmaps * Long.BYTES;
+    int offsetsSize = _numBitmaps * Integer.BYTES;
+    PinotDataBuffer dictionaryBuffer = dataBuffer.view(headerSize, headerSize + dictionarySize);
+    _offsetBuffer = dataBuffer.view(headerSize + dictionarySize, headerSize + dictionarySize + offsetsSize);
+    _bitmapBuffer = dataBuffer.view(headerSize + dictionarySize + offsetsSize, dataBuffer.size());
+    _dictionary = new LongDictionary(dictionaryBuffer, _numBitmaps);
+    _bitmapBufferSize = (int) _bitmapBuffer.size();
+  }
+
+  public ImmutableRoaringBitmap getDocIds(long h3IndexId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+    int dictId = _dictionary.indexOf(String.valueOf(h3IndexId));
+    if (dictId < 0) {
+      return new MutableRoaringBitmap();
+    }
+    // Return the bitmap if it's still on heap
+    if (_bitmaps != null) {
+      bitmapArrayReference = _bitmaps.get();
+      if (bitmapArrayReference != null) {
+        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+        if (bitmapReference != null) {
+          ImmutableRoaringBitmap value = bitmapReference.get();
+          if (value != null) {

Review comment:
       can the value be null? what does that mean

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java
##########
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.geospatial;
+
+import com.uber.h3core.H3Core;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.geospatial.GeometryUtils;
+import org.apache.pinot.core.geospatial.transform.function.StDistanceFunction;
+import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.Point;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.H3_INDEX_FILE_EXTENSION;
+
+
+public class H3IndexCreator implements GeoSpatialIndexCreator {
+
+  private static final int VERSION = 1;
+  private static final int FLUSH_THRESHOLD = 100_000;
+  private final H3Core _h3Core;
+  private File _indexDir;
+  private FieldSpec _fieldSpec;
+  private int _resolution;
+
+  TreeMap<Long, MutableRoaringBitmap> _h3IndexMap;
+
+  int numChunks = 0;
+  List<Integer> chunkLengths = new ArrayList<>();
+
+  public H3IndexCreator(File indexDir, FieldSpec fieldSpec, int resolution)
+      throws IOException {
+
+    _indexDir = indexDir;
+    _fieldSpec = fieldSpec;
+    _resolution = resolution;

Review comment:
       this assumes only one resolution is allowed. for more resolutions, do you think we shall extend this class or have multiple h3IndexCreator

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/H3IndexHandler.java
##########
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class H3IndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _h3IndexColumns = new HashSet<>();
+
+  public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
+      SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Only create H3 index on non-dictionary-encoded columns

Review comment:
       what does this mean?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org