You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/12/20 17:44:13 UTC

[incubator-pinot] 01/03: Initial commit for H3 based geospatial indexing

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch h3-index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4bb66774f94528b6ae6607f16c8089ad4f8adfc6
Author: kishoreg <g....@gmail.com>
AuthorDate: Mon Nov 30 09:58:32 2020 -0800

    Initial commit for H3 based geospatial indexing
---
 pinot-core/pom.xml                                 |   5 +
 .../segment/creator/GeoSpatialIndexCreator.java    |   9 +
 .../creator/impl/geospatial/H3IndexCreator.java    | 312 +++++++++++++++++++++
 .../index/readers/BaseImmutableDictionary.java     |   2 +-
 .../index/readers/geospatial/H3IndexReader.java    | 102 +++++++
 5 files changed, 429 insertions(+), 1 deletion(-)

diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index d4938a7..9a93eae 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -54,6 +54,11 @@
   </build>
   <dependencies>
     <dependency>
+      <groupId>com.uber</groupId>
+      <artifactId>h3</artifactId>
+      <version>3.0.3</version>
+    </dependency>
+    <dependency>
       <groupId>me.lemire.integercompression</groupId>
       <artifactId>JavaFastPFOR</artifactId>
     </dependency>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java
new file mode 100644
index 0000000..8fcd3bd
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java
@@ -0,0 +1,9 @@
+package org.apache.pinot.core.segment.creator;
+
+import java.io.Closeable;
+
+
+public interface GeoSpatialIndexCreator extends Closeable {
+
+  void add(int docId, double lat, double lon);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java
new file mode 100644
index 0000000..e2ec6a2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java
@@ -0,0 +1,312 @@
+package org.apache.pinot.core.segment.creator.impl.geospatial;
+
+import com.uber.h3core.H3Core;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class H3IndexCreator implements GeoSpatialIndexCreator {
+
+  private static final int VERSION = 1;
+  private static final int FLUSH_THRESHOLD = 100_000;
+  private final H3Core _h3Core;
+  private File _indexDir;
+  private FieldSpec _fieldSpec;
+  private int _resolution;
+
+  TreeMap<Long, MutableRoaringBitmap> _h3IndexMap;
+
+  int numChunks = 0;
+  List<Integer> chunkLengths = new ArrayList<>();
+
+  public H3IndexCreator(File indexDir, FieldSpec fieldSpec, int resolution)
+      throws IOException {
+
+    _indexDir = indexDir;
+    _fieldSpec = fieldSpec;
+    _resolution = resolution;
+    _h3Core = H3Core.newInstance();
+    //todo: initialize this with right size based on the
+    long numHexagons = _h3Core.numHexagons(resolution);
+    _h3IndexMap = new TreeMap<>();
+  }
+
+  @Override
+  public void add(int docId, double lat, double lon) {
+    Long h3Id = _h3Core.geoToH3(lat, lon, _resolution);
+    MutableRoaringBitmap roaringBitmap = _h3IndexMap.get(h3Id);
+    if (roaringBitmap == null) {
+      roaringBitmap = new MutableRoaringBitmap();
+      _h3IndexMap.put(h3Id, roaringBitmap);
+    }
+    roaringBitmap.add(docId);
+    if (_h3IndexMap.size() > FLUSH_THRESHOLD) {
+      flush();
+    }
+  }
+
+  private void flush() {
+    //dump what ever we have in _h3IndexMap in a sorted order
+    try {
+
+      File tempChunkFile = new File(_indexDir, "chunk-" + numChunks);
+      DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempChunkFile));
+      chunkLengths.add(_h3IndexMap.size());
+      for (Map.Entry<Long, MutableRoaringBitmap> entry : _h3IndexMap.entrySet()) {
+        Long h3Id = entry.getKey();
+        MutableRoaringBitmap bitmap = entry.getValue();
+        dos.writeLong(h3Id);
+        //write bitmap
+        int serializedSizeInBytes = bitmap.serializedSizeInBytes();
+        byte[] byteArray = new byte[serializedSizeInBytes];
+        bitmap.serialize(ByteBuffer.wrap(byteArray));
+        dos.writeInt(serializedSizeInBytes);
+        dos.write(byteArray);
+      }
+      dos.close();
+      _h3IndexMap.clear();
+      numChunks++;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void seal()
+      throws Exception {
+    flush();
+
+    //merge all the chunk files, since they are sorted we can write the dictionary as well
+    PriorityQueue<Entry> queue = new PriorityQueue<>();
+
+    ChunkReader[] chunkReaders = new ChunkReader[numChunks];
+    for (int chunkId = 0; chunkId < numChunks; chunkId++) {
+      File chunkFile = new File(_indexDir, "chunk-" + chunkId);
+      chunkReaders[chunkId] = new ChunkReader(chunkId, chunkLengths.get(chunkId), chunkFile);
+
+      Entry e = chunkReaders[chunkId].getNextEntry();
+      queue.add(e);
+    }
+    long prevH3Id = -1;
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+
+    File headerFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-header.buffer");
+    DataOutputStream headerStream = new DataOutputStream(new FileOutputStream(headerFile));
+
+    File dictionaryFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-dictionary.buffer");
+    DataOutputStream dictionaryStream = new DataOutputStream(new FileOutputStream(dictionaryFile));
+
+    File offsetFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-offset.buffer");
+    DataOutputStream offsetStream = new DataOutputStream(new FileOutputStream(offsetFile));
+
+    File bitmapFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-bitmap.buffer");
+    DataOutputStream bitmapStream = new DataOutputStream(new FileOutputStream(bitmapFile));
+
+    Writer writer = new Writer(dictionaryStream, offsetStream, bitmapStream);
+    while (queue.size() > 0) {
+      Entry poll = queue.poll();
+      long currH3Id = poll.h3Id;
+      if (prevH3Id != -1 && currH3Id != prevH3Id) {
+        writer.add(prevH3Id, bitmap);
+        bitmap.clear();
+      }
+      bitmap.or(poll.bitmap);
+
+      prevH3Id = currH3Id;
+
+      Entry e = chunkReaders[poll.chunkId].getNextEntry();
+      if (e != null) {
+        queue.add(e);
+      }
+    }
+    if (prevH3Id != -1) {
+      writer.add(prevH3Id, bitmap);
+    }
+
+    //write header file
+    headerStream.writeInt(VERSION);
+    headerStream.writeInt(writer.getNumUniqueIds());
+    headerStream.close();
+    dictionaryStream.close();
+    offsetStream.close();
+    bitmapStream.close();
+
+    File outputFile = new File(_indexDir, _fieldSpec.getName() + ".h3.index");
+    long length = headerStream.size() + dictionaryStream.size() + offsetStream.size() + bitmapStream.size();
+    //write the actual file
+    PinotDataBuffer h3IndexBuffer =
+        PinotDataBuffer.mapFile(outputFile, false, 0, length, ByteOrder.BIG_ENDIAN, "H3 Index Buffer");
+
+    long writtenBytes = 0;
+    h3IndexBuffer.readFrom(writtenBytes, headerFile, 0, headerFile.length());
+    writtenBytes += headerFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length());
+    writtenBytes += dictionaryFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, offsetFile, 0, offsetFile.length());
+    writtenBytes += offsetFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, bitmapFile, 0, bitmapFile.length());
+    writtenBytes += headerFile.length();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    //delete chunk files
+  }
+
+  class ChunkReader {
+    private int _chunkId;
+    private Integer _chunkLength;
+    private DataInputStream dataInputStream;
+    int index = 0;
+
+    ChunkReader(int chunkId, Integer chunkLength, File chunkFile)
+        throws Exception {
+      _chunkId = chunkId;
+      _chunkLength = chunkLength;
+      dataInputStream = new DataInputStream(new FileInputStream(chunkFile));
+    }
+
+    private Entry getNextEntry()
+        throws IOException {
+      if (index >= _chunkLength) {
+        return null;
+      }
+      long h3Id = dataInputStream.readLong();
+      int size = dataInputStream.readInt();
+      byte[] serializedBytes = new byte[size];
+      dataInputStream.read(serializedBytes);
+      ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(serializedBytes));
+      index++;
+      return new Entry(_chunkId, h3Id, bitmap);
+    }
+  }
+
+  class Entry implements Comparable<Entry> {
+    int chunkId;
+
+    long h3Id;
+
+    ImmutableRoaringBitmap bitmap;
+
+    @Override
+    public boolean equals(Object o) {
+
+      return h3Id == ((Entry) o).h3Id;
+    }
+
+    public Entry(int chunkId, long h3Id, ImmutableRoaringBitmap bitmap) {
+      this.chunkId = chunkId;
+      this.h3Id = h3Id;
+      this.bitmap = bitmap;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.hashCode(h3Id);
+    }
+
+    @Override
+    public int compareTo(Entry o) {
+      return Long.compare(h3Id, o.h3Id);
+    }
+  }
+
+  private class Writer {
+
+    private DataOutputStream _dictionaryStream;
+    private DataOutputStream _offsetStream;
+    private DataOutputStream _bitmapStream;
+    private int _offset = 0;
+    private int _numUniqueIds = 0;
+
+    public Writer(DataOutputStream dictionaryStream, DataOutputStream offsetStream, DataOutputStream bitmapStream) {
+
+      _dictionaryStream = dictionaryStream;
+      _offsetStream = offsetStream;
+      _bitmapStream = bitmapStream;
+    }
+
+    public int getNumUniqueIds() {
+      return _numUniqueIds;
+    }
+
+    public void add(long h3Id, ImmutableRoaringBitmap bitmap)
+        throws IOException {
+      _dictionaryStream.writeLong(h3Id);
+      _offsetStream.writeInt(_offset);
+      int serializedSizeInBytes = bitmap.serializedSizeInBytes();
+      byte[] byteArray = new byte[serializedSizeInBytes];
+      bitmap.serialize(ByteBuffer.wrap(byteArray));
+      _bitmapStream.write(byteArray);
+      _offset += serializedSizeInBytes;
+      _numUniqueIds++;
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    File indexDir = new File(System.getProperty("java.io.tmpdir"), "h3IndexDir");
+    FileUtils.deleteDirectory(indexDir);
+    indexDir.mkdirs();
+    FieldSpec spec = new DimensionFieldSpec("geo_col", FieldSpec.DataType.STRING, true);
+    int resolution = 5;
+    H3IndexCreator creator = new H3IndexCreator(indexDir, spec, resolution);
+    Random rand = new Random();
+    H3Core h3Core = H3Core.newInstance();
+    Map<Long, Integer> map = new HashMap<>();
+    for (int i = 0; i < 10000; i++) {
+      int lat = rand.nextInt(10);
+      int lon = rand.nextInt(10);
+      creator.add(i, lat, lon);
+      long h3 = h3Core.geoToH3(lat, lon, resolution);
+      Integer count = map.get(h3);
+      if (count != null) {
+        map.put(h3, count + 1);
+      } else {
+        map.put(h3, 1);
+      }
+    }
+    creator.seal();
+
+    System.out.println(
+        "Contents of IndexDir \n " + FileUtils.listFiles(indexDir, null, true).toString().replaceAll(",", "\n"));
+    File h3IndexFile = new File(indexDir, "geo_col.h3.index");
+    PinotDataBuffer h3IndexBuffer =
+        PinotDataBuffer.mapFile(h3IndexFile, true, 0, h3IndexFile.length(), ByteOrder.BIG_ENDIAN, "H3 index file");
+    H3IndexReader reader = new H3IndexReader(h3IndexBuffer);
+    for (Map.Entry<Long, Integer> entry : map.entrySet()) {
+      Long h3 = entry.getKey();
+      ImmutableRoaringBitmap docIds = reader.getDocIds(h3);
+      if (docIds.getCardinality() != map.get(h3)) {
+        System.out.printf("Failed: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3);
+      } else {
+        System.out.printf("Matched: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3);
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
index b9f5f53..ff24016 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
@@ -45,7 +45,7 @@ public abstract class BaseImmutableDictionary implements Dictionary {
       _valueReader = new VarLengthBytesValueReaderWriter(dataBuffer);
     } else {
       Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue,
-          "Buffer size mismatch: bufferSize = %s, numValues = %s, numByesPerValue = %s", dataBuffer.size(), length,
+          "Buffer size mismatch: bufferSize = %s, numValues = %s, numBytesPerValue = %s", dataBuffer.size(), length,
           numBytesPerValue);
       _valueReader = new FixedByteValueReaderWriter(dataBuffer);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
new file mode 100644
index 0000000..96fa586
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
@@ -0,0 +1,102 @@
+package org.apache.pinot.core.segment.index.readers.geospatial;
+
+import java.io.Closeable;
+import java.lang.ref.SoftReference;
+import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.IntDictionary;
+import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class H3IndexReader implements Closeable {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class);
+
+  private final PinotDataBuffer _bitmapBuffer;
+  private final PinotDataBuffer _offsetBuffer;
+  private final int _numBitmaps;
+  private final int _bitmapBufferSize;
+
+  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
+
+  private Dictionary _dictionary;
+
+  /**
+   * Constructs an inverted index with the specified size.
+   * @param dataBuffer data buffer for the inverted index.
+   */
+  public H3IndexReader(PinotDataBuffer dataBuffer) {
+    int version = dataBuffer.getInt(0 * Integer.BYTES);
+    _numBitmaps = dataBuffer.getInt(1 * Integer.BYTES);
+
+    int headerSize = 2 * Integer.BYTES;
+    //read the dictionary
+    int dictionarySize = _numBitmaps * Long.BYTES;
+    int offsetsSize = _numBitmaps * Integer.BYTES;
+    PinotDataBuffer dictionaryBuffer = dataBuffer.view(headerSize, headerSize + dictionarySize);
+    _offsetBuffer = dataBuffer.view(headerSize + dictionarySize, headerSize + dictionarySize + offsetsSize);
+    _bitmapBuffer = dataBuffer.view(headerSize + dictionarySize + offsetsSize, dataBuffer.size());
+    _dictionary = new LongDictionary(dictionaryBuffer, _numBitmaps);
+    _bitmapBufferSize = (int) _bitmapBuffer.size();
+  }
+
+  public ImmutableRoaringBitmap getDocIds(long h3IndexId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+    int dictId = _dictionary.indexOf(String.valueOf(h3IndexId));
+    // Return the bitmap if it's still on heap
+    if (_bitmaps != null) {
+      bitmapArrayReference = _bitmaps.get();
+      if (bitmapArrayReference != null) {
+        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+        if (bitmapReference != null) {
+          ImmutableRoaringBitmap value = bitmapReference.get();
+          if (value != null) {
+            return value;
+          }
+        }
+      } else {
+        bitmapArrayReference = new SoftReference[_numBitmaps];
+        _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numBitmaps];
+      _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      ImmutableRoaringBitmap value;
+      if (bitmapArrayReference[dictId] == null || bitmapArrayReference[dictId].get() == null) {
+        value = buildRoaringBitmapForIndex(dictId);
+        bitmapArrayReference[dictId] = new SoftReference<ImmutableRoaringBitmap>(value);
+      } else {
+        value = bitmapArrayReference[dictId].get();
+      }
+      return value;
+    }
+  }
+
+  //todo: fix this
+  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int index) {
+    int currentOffset = getOffset(index);
+    int bufferLength;
+    if (index == _numBitmaps - 1) {
+      bufferLength = _bitmapBufferSize - currentOffset;
+    } else {
+      bufferLength = getOffset(index + 1) - currentOffset;
+    }
+    return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(currentOffset, bufferLength));
+  }
+
+  private int getOffset(final int index) {
+    return _offsetBuffer.getInt(index * Integer.BYTES);
+  }
+
+  @Override
+  public void close() {
+    // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+    // caller is responsible of closing the PinotDataBuffer.
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org