You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/11/30 17:58:58 UTC

[incubator-pinot] branch h3-index created (now fe0e286)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch h3-index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at fe0e286  Initial commit for H3 based geospatial indexing

This branch includes the following new commits:

     new fe0e286  Initial commit for H3 based geospatial indexing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Initial commit for H3 based geospatial indexing

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch h3-index
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit fe0e2862d157373f1be98c43f06fa09a4398a7ee
Author: kishoreg <g....@gmail.com>
AuthorDate: Mon Nov 30 09:58:32 2020 -0800

    Initial commit for H3 based geospatial indexing
---
 pinot-core/pom.xml                                 |   5 +
 .../segment/creator/GeoSpatialIndexCreator.java    |   9 +
 .../creator/impl/geospatial/H3IndexCreator.java    | 312 +++++++++++++++++++++
 .../index/readers/BaseImmutableDictionary.java     |   2 +-
 .../index/readers/geospatial/H3IndexReader.java    | 102 +++++++
 5 files changed, 429 insertions(+), 1 deletion(-)

diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index d4938a7..9a93eae 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -54,6 +54,11 @@
   </build>
   <dependencies>
     <dependency>
+      <groupId>com.uber</groupId>
+      <artifactId>h3</artifactId>
+      <version>3.0.3</version>
+    </dependency>
+    <dependency>
       <groupId>me.lemire.integercompression</groupId>
       <artifactId>JavaFastPFOR</artifactId>
     </dependency>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java
new file mode 100644
index 0000000..8fcd3bd
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java
@@ -0,0 +1,9 @@
+package org.apache.pinot.core.segment.creator;
+
+import java.io.Closeable;
+
+
+public interface GeoSpatialIndexCreator extends Closeable {
+
+  void add(int docId, double lat, double lon);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java
new file mode 100644
index 0000000..e2ec6a2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java
@@ -0,0 +1,312 @@
+package org.apache.pinot.core.segment.creator.impl.geospatial;
+
+import com.uber.h3core.H3Core;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class H3IndexCreator implements GeoSpatialIndexCreator {
+
+  private static final int VERSION = 1;
+  private static final int FLUSH_THRESHOLD = 100_000;
+  private final H3Core _h3Core;
+  private File _indexDir;
+  private FieldSpec _fieldSpec;
+  private int _resolution;
+
+  TreeMap<Long, MutableRoaringBitmap> _h3IndexMap;
+
+  int numChunks = 0;
+  List<Integer> chunkLengths = new ArrayList<>();
+
+  public H3IndexCreator(File indexDir, FieldSpec fieldSpec, int resolution)
+      throws IOException {
+
+    _indexDir = indexDir;
+    _fieldSpec = fieldSpec;
+    _resolution = resolution;
+    _h3Core = H3Core.newInstance();
+    //todo: initialize this with right size based on the
+    long numHexagons = _h3Core.numHexagons(resolution);
+    _h3IndexMap = new TreeMap<>();
+  }
+
+  @Override
+  public void add(int docId, double lat, double lon) {
+    Long h3Id = _h3Core.geoToH3(lat, lon, _resolution);
+    MutableRoaringBitmap roaringBitmap = _h3IndexMap.get(h3Id);
+    if (roaringBitmap == null) {
+      roaringBitmap = new MutableRoaringBitmap();
+      _h3IndexMap.put(h3Id, roaringBitmap);
+    }
+    roaringBitmap.add(docId);
+    if (_h3IndexMap.size() > FLUSH_THRESHOLD) {
+      flush();
+    }
+  }
+
+  private void flush() {
+    //dump what ever we have in _h3IndexMap in a sorted order
+    try {
+
+      File tempChunkFile = new File(_indexDir, "chunk-" + numChunks);
+      DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempChunkFile));
+      chunkLengths.add(_h3IndexMap.size());
+      for (Map.Entry<Long, MutableRoaringBitmap> entry : _h3IndexMap.entrySet()) {
+        Long h3Id = entry.getKey();
+        MutableRoaringBitmap bitmap = entry.getValue();
+        dos.writeLong(h3Id);
+        //write bitmap
+        int serializedSizeInBytes = bitmap.serializedSizeInBytes();
+        byte[] byteArray = new byte[serializedSizeInBytes];
+        bitmap.serialize(ByteBuffer.wrap(byteArray));
+        dos.writeInt(serializedSizeInBytes);
+        dos.write(byteArray);
+      }
+      dos.close();
+      _h3IndexMap.clear();
+      numChunks++;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void seal()
+      throws Exception {
+    flush();
+
+    //merge all the chunk files, since they are sorted we can write the dictionary as well
+    PriorityQueue<Entry> queue = new PriorityQueue<>();
+
+    ChunkReader[] chunkReaders = new ChunkReader[numChunks];
+    for (int chunkId = 0; chunkId < numChunks; chunkId++) {
+      File chunkFile = new File(_indexDir, "chunk-" + chunkId);
+      chunkReaders[chunkId] = new ChunkReader(chunkId, chunkLengths.get(chunkId), chunkFile);
+
+      Entry e = chunkReaders[chunkId].getNextEntry();
+      queue.add(e);
+    }
+    long prevH3Id = -1;
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+
+    File headerFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-header.buffer");
+    DataOutputStream headerStream = new DataOutputStream(new FileOutputStream(headerFile));
+
+    File dictionaryFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-dictionary.buffer");
+    DataOutputStream dictionaryStream = new DataOutputStream(new FileOutputStream(dictionaryFile));
+
+    File offsetFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-offset.buffer");
+    DataOutputStream offsetStream = new DataOutputStream(new FileOutputStream(offsetFile));
+
+    File bitmapFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-bitmap.buffer");
+    DataOutputStream bitmapStream = new DataOutputStream(new FileOutputStream(bitmapFile));
+
+    Writer writer = new Writer(dictionaryStream, offsetStream, bitmapStream);
+    while (queue.size() > 0) {
+      Entry poll = queue.poll();
+      long currH3Id = poll.h3Id;
+      if (prevH3Id != -1 && currH3Id != prevH3Id) {
+        writer.add(prevH3Id, bitmap);
+        bitmap.clear();
+      }
+      bitmap.or(poll.bitmap);
+
+      prevH3Id = currH3Id;
+
+      Entry e = chunkReaders[poll.chunkId].getNextEntry();
+      if (e != null) {
+        queue.add(e);
+      }
+    }
+    if (prevH3Id != -1) {
+      writer.add(prevH3Id, bitmap);
+    }
+
+    //write header file
+    headerStream.writeInt(VERSION);
+    headerStream.writeInt(writer.getNumUniqueIds());
+    headerStream.close();
+    dictionaryStream.close();
+    offsetStream.close();
+    bitmapStream.close();
+
+    File outputFile = new File(_indexDir, _fieldSpec.getName() + ".h3.index");
+    long length = headerStream.size() + dictionaryStream.size() + offsetStream.size() + bitmapStream.size();
+    //write the actual file
+    PinotDataBuffer h3IndexBuffer =
+        PinotDataBuffer.mapFile(outputFile, false, 0, length, ByteOrder.BIG_ENDIAN, "H3 Index Buffer");
+
+    long writtenBytes = 0;
+    h3IndexBuffer.readFrom(writtenBytes, headerFile, 0, headerFile.length());
+    writtenBytes += headerFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length());
+    writtenBytes += dictionaryFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, offsetFile, 0, offsetFile.length());
+    writtenBytes += offsetFile.length();
+
+    h3IndexBuffer.readFrom(writtenBytes, bitmapFile, 0, bitmapFile.length());
+    writtenBytes += headerFile.length();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    //delete chunk files
+  }
+
+  class ChunkReader {
+    private int _chunkId;
+    private Integer _chunkLength;
+    private DataInputStream dataInputStream;
+    int index = 0;
+
+    ChunkReader(int chunkId, Integer chunkLength, File chunkFile)
+        throws Exception {
+      _chunkId = chunkId;
+      _chunkLength = chunkLength;
+      dataInputStream = new DataInputStream(new FileInputStream(chunkFile));
+    }
+
+    private Entry getNextEntry()
+        throws IOException {
+      if (index >= _chunkLength) {
+        return null;
+      }
+      long h3Id = dataInputStream.readLong();
+      int size = dataInputStream.readInt();
+      byte[] serializedBytes = new byte[size];
+      dataInputStream.read(serializedBytes);
+      ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(serializedBytes));
+      index++;
+      return new Entry(_chunkId, h3Id, bitmap);
+    }
+  }
+
+  class Entry implements Comparable<Entry> {
+    int chunkId;
+
+    long h3Id;
+
+    ImmutableRoaringBitmap bitmap;
+
+    @Override
+    public boolean equals(Object o) {
+
+      return h3Id == ((Entry) o).h3Id;
+    }
+
+    public Entry(int chunkId, long h3Id, ImmutableRoaringBitmap bitmap) {
+      this.chunkId = chunkId;
+      this.h3Id = h3Id;
+      this.bitmap = bitmap;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.hashCode(h3Id);
+    }
+
+    @Override
+    public int compareTo(Entry o) {
+      return Long.compare(h3Id, o.h3Id);
+    }
+  }
+
+  private class Writer {
+
+    private DataOutputStream _dictionaryStream;
+    private DataOutputStream _offsetStream;
+    private DataOutputStream _bitmapStream;
+    private int _offset = 0;
+    private int _numUniqueIds = 0;
+
+    public Writer(DataOutputStream dictionaryStream, DataOutputStream offsetStream, DataOutputStream bitmapStream) {
+
+      _dictionaryStream = dictionaryStream;
+      _offsetStream = offsetStream;
+      _bitmapStream = bitmapStream;
+    }
+
+    public int getNumUniqueIds() {
+      return _numUniqueIds;
+    }
+
+    public void add(long h3Id, ImmutableRoaringBitmap bitmap)
+        throws IOException {
+      _dictionaryStream.writeLong(h3Id);
+      _offsetStream.writeInt(_offset);
+      int serializedSizeInBytes = bitmap.serializedSizeInBytes();
+      byte[] byteArray = new byte[serializedSizeInBytes];
+      bitmap.serialize(ByteBuffer.wrap(byteArray));
+      _bitmapStream.write(byteArray);
+      _offset += serializedSizeInBytes;
+      _numUniqueIds++;
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    File indexDir = new File(System.getProperty("java.io.tmpdir"), "h3IndexDir");
+    FileUtils.deleteDirectory(indexDir);
+    indexDir.mkdirs();
+    FieldSpec spec = new DimensionFieldSpec("geo_col", FieldSpec.DataType.STRING, true);
+    int resolution = 5;
+    H3IndexCreator creator = new H3IndexCreator(indexDir, spec, resolution);
+    Random rand = new Random();
+    H3Core h3Core = H3Core.newInstance();
+    Map<Long, Integer> map = new HashMap<>();
+    for (int i = 0; i < 10000; i++) {
+      int lat = rand.nextInt(10);
+      int lon = rand.nextInt(10);
+      creator.add(i, lat, lon);
+      long h3 = h3Core.geoToH3(lat, lon, resolution);
+      Integer count = map.get(h3);
+      if (count != null) {
+        map.put(h3, count + 1);
+      } else {
+        map.put(h3, 1);
+      }
+    }
+    creator.seal();
+
+    System.out.println(
+        "Contents of IndexDir \n " + FileUtils.listFiles(indexDir, null, true).toString().replaceAll(",", "\n"));
+    File h3IndexFile = new File(indexDir, "geo_col.h3.index");
+    PinotDataBuffer h3IndexBuffer =
+        PinotDataBuffer.mapFile(h3IndexFile, true, 0, h3IndexFile.length(), ByteOrder.BIG_ENDIAN, "H3 index file");
+    H3IndexReader reader = new H3IndexReader(h3IndexBuffer);
+    for (Map.Entry<Long, Integer> entry : map.entrySet()) {
+      Long h3 = entry.getKey();
+      ImmutableRoaringBitmap docIds = reader.getDocIds(h3);
+      if (docIds.getCardinality() != map.get(h3)) {
+        System.out.printf("Failed: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3);
+      } else {
+        System.out.printf("Matched: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3);
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
index b9f5f53..ff24016 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java
@@ -45,7 +45,7 @@ public abstract class BaseImmutableDictionary implements Dictionary {
       _valueReader = new VarLengthBytesValueReaderWriter(dataBuffer);
     } else {
       Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue,
-          "Buffer size mismatch: bufferSize = %s, numValues = %s, numByesPerValue = %s", dataBuffer.size(), length,
+          "Buffer size mismatch: bufferSize = %s, numValues = %s, numBytesPerValue = %s", dataBuffer.size(), length,
           numBytesPerValue);
       _valueReader = new FixedByteValueReaderWriter(dataBuffer);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
new file mode 100644
index 0000000..96fa586
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java
@@ -0,0 +1,102 @@
+package org.apache.pinot.core.segment.index.readers.geospatial;
+
+import java.io.Closeable;
+import java.lang.ref.SoftReference;
+import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.IntDictionary;
+import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class H3IndexReader implements Closeable {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class);
+
+  private final PinotDataBuffer _bitmapBuffer;
+  private final PinotDataBuffer _offsetBuffer;
+  private final int _numBitmaps;
+  private final int _bitmapBufferSize;
+
+  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
+
+  private Dictionary _dictionary;
+
+  /**
+   * Constructs an inverted index with the specified size.
+   * @param dataBuffer data buffer for the inverted index.
+   */
+  public H3IndexReader(PinotDataBuffer dataBuffer) {
+    int version = dataBuffer.getInt(0 * Integer.BYTES);
+    _numBitmaps = dataBuffer.getInt(1 * Integer.BYTES);
+
+    int headerSize = 2 * Integer.BYTES;
+    //read the dictionary
+    int dictionarySize = _numBitmaps * Long.BYTES;
+    int offsetsSize = _numBitmaps * Integer.BYTES;
+    PinotDataBuffer dictionaryBuffer = dataBuffer.view(headerSize, headerSize + dictionarySize);
+    _offsetBuffer = dataBuffer.view(headerSize + dictionarySize, headerSize + dictionarySize + offsetsSize);
+    _bitmapBuffer = dataBuffer.view(headerSize + dictionarySize + offsetsSize, dataBuffer.size());
+    _dictionary = new LongDictionary(dictionaryBuffer, _numBitmaps);
+    _bitmapBufferSize = (int) _bitmapBuffer.size();
+  }
+
+  public ImmutableRoaringBitmap getDocIds(long h3IndexId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+    int dictId = _dictionary.indexOf(String.valueOf(h3IndexId));
+    // Return the bitmap if it's still on heap
+    if (_bitmaps != null) {
+      bitmapArrayReference = _bitmaps.get();
+      if (bitmapArrayReference != null) {
+        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+        if (bitmapReference != null) {
+          ImmutableRoaringBitmap value = bitmapReference.get();
+          if (value != null) {
+            return value;
+          }
+        }
+      } else {
+        bitmapArrayReference = new SoftReference[_numBitmaps];
+        _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numBitmaps];
+      _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      ImmutableRoaringBitmap value;
+      if (bitmapArrayReference[dictId] == null || bitmapArrayReference[dictId].get() == null) {
+        value = buildRoaringBitmapForIndex(dictId);
+        bitmapArrayReference[dictId] = new SoftReference<ImmutableRoaringBitmap>(value);
+      } else {
+        value = bitmapArrayReference[dictId].get();
+      }
+      return value;
+    }
+  }
+
+  //todo: fix this
+  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int index) {
+    int currentOffset = getOffset(index);
+    int bufferLength;
+    if (index == _numBitmaps - 1) {
+      bufferLength = _bitmapBufferSize - currentOffset;
+    } else {
+      bufferLength = getOffset(index + 1) - currentOffset;
+    }
+    return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(currentOffset, bufferLength));
+  }
+
+  private int getOffset(final int index) {
+    return _offsetBuffer.getInt(index * Integer.BYTES);
+  }
+
+  @Override
+  public void close() {
+    // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+    // caller is responsible of closing the PinotDataBuffer.
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org