You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/11 20:23:10 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6216: Json indexing

Jackie-Jiang commented on a change in pull request #6216:
URL: https://github.com/apache/incubator-pinot/pull/6216#discussion_r541249316



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/JSONIndexCreator.java
##########
@@ -0,0 +1,658 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.wnameless.json.flattener.JsonFlattener;
+import com.google.common.io.Files;
+import com.google.common.primitives.UnsignedBytes;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.JSONMatchFilterOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.segment.index.readers.JSONIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION;
+
+
+public class JSONIndexCreator implements Closeable {
+
+  //separator used to join the key and value to create posting list key
+  public static String POSTING_LIST_KEY_SEPARATOR = "|";
+  static int FLUSH_THRESHOLD = 50_000;
+  static int VERSION = 1;
+  private final File flattenedDocId2RootDocIdMappingFile;
+  private final File postingListFile;
+  private File dictionaryheaderFile;
+  private File dictionaryOffsetFile;
+  private File dictionaryFile;
+  private File invertedIndexOffsetFile;
+  private File invertedIndexFile;
+  private File outputIndexFile;
+
+  private int docId = 0;
+  private int numFlatennedDocId = 0;
+  int chunkId = 0;
+
+  private DataOutputStream postingListWriter;
+  private DataOutputStream flattenedDocId2RootDocIdWriter;
+
+  Map<String, List<Integer>> postingListMap = new TreeMap<>();
+  List<Integer> flattenedDocIdList = new ArrayList<>();
+  List<Integer> postingListChunkOffsets = new ArrayList<>();
+  List<Integer> chunkLengths = new ArrayList<>();
+  private FieldSpec fieldSpec;
+
+  public JSONIndexCreator(File indexDir, FieldSpec fieldSpec)
+      throws IOException {
+    this.fieldSpec = fieldSpec;
+    System.out.println("indexDir = " + indexDir);
+
+    String name = fieldSpec.getName();
+    postingListFile = new File(indexDir + name + "_postingList.buf");
+    postingListWriter = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(postingListFile)));
+    postingListChunkOffsets.add(postingListWriter.size());
+
+    dictionaryheaderFile = new File(indexDir, name + "_dictionaryHeader.buf");
+    dictionaryOffsetFile = new File(indexDir, name + "_dictionaryOffset.buf");
+    dictionaryFile = new File(indexDir, name + "_dictionary.buf");
+    invertedIndexOffsetFile = new File(indexDir, name + "_invertedIndexOffset.buf");
+    invertedIndexFile = new File(indexDir, name + "_invertedIndex.buf");
+    flattenedDocId2RootDocIdMappingFile = new File(indexDir, name + "_flattenedDocId.buf");
+    flattenedDocId2RootDocIdWriter =
+        new DataOutputStream(new BufferedOutputStream(new FileOutputStream(flattenedDocId2RootDocIdMappingFile)));
+
+    //output file
+    outputIndexFile = new File(indexDir, name + JSON_INDEX_FILE_EXTENSION);
+  }
+
+  public void add(byte[] data)
+      throws IOException {
+
+    JsonNode jsonNode = new ObjectMapper().readTree(data);
+    List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+    for (Map<String, String> map : flattenedMapList) {
+      //
+      for (Map.Entry<String, String> entry : map.entrySet()) {
+        //handle key posting list
+        String key = entry.getKey();
+
+        List<Integer> keyPostingList = postingListMap.get(key);
+        if (keyPostingList == null) {
+          keyPostingList = new ArrayList<>();
+          postingListMap.put(key, keyPostingList);
+        }
+        keyPostingList.add(numFlatennedDocId);
+
+        //handle keyvalue posting list
+        String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+        List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+        if (keyValuePostingList == null) {
+          keyValuePostingList = new ArrayList<>();
+          postingListMap.put(keyValue, keyValuePostingList);
+        }
+        keyValuePostingList.add(numFlatennedDocId);
+      }
+      //flattenedDocId2RootDocIdMapping
+      flattenedDocIdList.add(docId);
+
+      numFlatennedDocId++;
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  /**
+   * Multi value
+   * @param dataArray
+   * @param length
+   * @throws IOException
+   */
+  public void add(byte[][] dataArray, int length)
+      throws IOException {
+
+    for (int i = 0; i < length; i++) {
+      byte[] data = dataArray[i];
+      JsonNode jsonNode = new ObjectMapper().readTree(data);
+      List<Map<String, String>> flattenedMapList = unnestJson(jsonNode);
+      for (Map<String, String> map : flattenedMapList) {
+        //
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+          //handle key posting list
+          String key = entry.getKey();
+
+          List<Integer> keyPostingList = postingListMap.get(key);
+          if (keyPostingList == null) {
+            keyPostingList = new ArrayList<>();
+            postingListMap.put(key, keyPostingList);
+          }
+          keyPostingList.add(numFlatennedDocId);
+
+          //handle keyvalue posting list
+          String keyValue = key + POSTING_LIST_KEY_SEPARATOR + entry.getValue();
+          List<Integer> keyValuePostingList = postingListMap.get(keyValue);
+          if (keyValuePostingList == null) {
+            keyValuePostingList = new ArrayList<>();
+            postingListMap.put(keyValue, keyValuePostingList);
+          }
+          keyValuePostingList.add(numFlatennedDocId);
+        }
+        //flattenedDocId2RootDocIdMapping
+        flattenedDocIdList.add(numFlatennedDocId);
+
+        numFlatennedDocId++;
+      }
+    }
+    docId++;
+
+    //flush data
+    if (docId % FLUSH_THRESHOLD == 0) {
+      flush();
+    }
+  }
+
+  public void seal()
+      throws IOException {
+
+    flush();
+
+    flattenedDocId2RootDocIdWriter.close();
+    postingListWriter.close();
+
+    //key posting list merging
+    System.out.println("InvertedIndex");
+    System.out.println("=================");
+
+    int maxKeyLength = createInvertedIndex(postingListFile, postingListChunkOffsets, chunkLengths);
+    System.out.println("=================");
+
+    int flattenedDocid = 0;
+    DataInputStream flattenedDocId2RootDocIdReader =
+        new DataInputStream(new BufferedInputStream(new FileInputStream(flattenedDocId2RootDocIdMappingFile)));
+    int[] rootDocIdArray = new int[numFlatennedDocId];
+    while (flattenedDocid < numFlatennedDocId) {
+      rootDocIdArray[flattenedDocid++] = flattenedDocId2RootDocIdReader.readInt();
+    }
+    System.out.println("FlattenedDocId  to RootDocId Mapping = ");
+    System.out.println(Arrays.toString(rootDocIdArray));
+
+    //PUT all contents into one file
+
+    //header
+    // version + maxDictionaryLength + [store the offsets + length for each one (dictionary offset file, dictionaryFile, index offset file, index file, flattened docId to rootDocId file)]
+    long headerSize = 2 * Integer.BYTES + 6 * 2 * Long.BYTES;

Review comment:
       As long as the version is included in the header, we can always evolve. The reader factory can check the version and create the proper reader for the version




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org