You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/02/17 16:17:03 UTC

[carbondata] branch master updated: [CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue with Index server

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f1db97  [CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue with Index server
3f1db97 is described below

commit 3f1db97df626ebced8df01f596f4d12f41e90344
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Fri Feb 5 20:30:47 2021 +0530

    [CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue with Index server
    
    Why is this PR needed?
    1. Test cg index query with Index server fails with NPE. While initializing the index model,
    a parsing error is thrown when trying to uncompress with snappy.
    2. Bloom index query with Index server giving incorrect results when splits have >1 blocklets.
    Blocklet level details are not serialized for index server as it is considered as block level cache.
    
    What changes were proposed in this PR?
    1. Have set segment and schema details to BlockletIndexInputSplit object. While writing
    minmax object, write byte size instead of position.
    2. Creating BlockletIndex when bloom filter is used, so in createBlocklet step isBlockCache
    is set to false.
    
    This closes #4089
---
 .../java/org/apache/carbondata/core/index/IndexUtil.java  |  1 +
 .../carbondata/core/indexstore/ExtendedBlocklet.java      |  9 ++++++++-
 .../spark/testsuite/index/CGIndexTestCase.scala           | 15 +++++++++++----
 3 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 56ee810..c663b78 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -257,6 +257,7 @@ public class IndexUtil {
         for (ExtendedBlocklet blocklet : prunedBlocklet) {
           blocklet.getDetailInfo();
           blocklet.setIndexUniqueId(wrapper.getUniqueId());
+          blocklet.setCgIndexPresent(true);
         }
         extendedBlocklets.addAll(prunedBlocklet);
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 0d2ba79..900d7c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -48,6 +48,8 @@ public class ExtendedBlocklet extends Blocklet {
 
   private String segmentNo;
 
+  private boolean isCgIndexPresent = false;
+
   public ExtendedBlocklet() {
 
   }
@@ -191,7 +193,8 @@ public class ExtendedBlocklet extends Blocklet {
         DataOutputStream dos = new DataOutputStream(ebos);
         inputSplit.setFilePath(null);
         inputSplit.setBucketId(null);
-        if (inputSplit.isBlockCache()) {
+        // serialize detail info when it is blocklet cache or cgIndex is present.
+        if (inputSplit.isBlockCache() && !isCgIndexPresent) {
           inputSplit.updateFooterOffset();
           inputSplit.updateBlockLength();
           inputSplit.setWriteDetailInfo(false);
@@ -247,4 +250,8 @@ public class ExtendedBlocklet extends Blocklet {
           new CarbonInputSplit(serializeLen, in, getFilePath(), locations, getBlockletId());
     }
   }
+
+  public void setCgIndexPresent(boolean cgIndexPresent) {
+    isCgIndexPresent = cgIndexPresent;
+  }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
index a94d7ca..2bf46aa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.testsuite.index
 
 import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment}
 import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexModel, IndexWriter}
 import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory}
+import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -48,6 +50,7 @@ import org.apache.carbondata.core.scan.filter.executer.FilterExecutor
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.Event
 import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
 
@@ -112,8 +115,12 @@ class CGIndexFactory(
 
     val files = file.listFiles()
     files.map { f =>
-      val d: IndexInputSplit = new BlockletIndexInputSplit(f.getCanonicalPath)
-      d
+      val d: BlockletIndexInputSplit = new BlockletIndexInputSplit(f.getCanonicalPath)
+      d.setSegment(segment)
+      d.setIndexSchema(getIndexSchema)
+      d.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath,
+        segment.getSegmentNo))
+      new IndexInputSplitWrapper(UUID.randomUUID.toString, d).getDistributable
     }.toList.asJava
   }
 
@@ -351,8 +358,8 @@ class CGIndexWriter(
     outStream.writeObject(maxMin)
     outStream.close()
     val bytes = compressor.compressByte(out.getBytes)
-    stream.write(bytes.array(), 0, bytes.position())
-    stream.writeInt(bytes.position())
+    stream.write(bytes.array(), 0, bytes.limit())
+    stream.writeInt(bytes.limit())
     stream.close()
   }