You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/02/17 16:17:03 UTC
[carbondata] branch master updated:
[CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue
with Index server
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3f1db97 [CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue with Index server
3f1db97 is described below
commit 3f1db97df626ebced8df01f596f4d12f41e90344
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Fri Feb 5 20:30:47 2021 +0530
[CARBONDATA-4117][CARBONDATA-4123] cg index and bloom index query issue with Index server
Why is this PR needed?
1. Test cg index query with Index server fails with NPE. While initializing the index model,
a parsing error is thrown when trying to uncompress with snappy.
2. Bloom index query with Index server giving incorrect results when splits have >1 blocklets.
Blocklet level details are not serialized for index server as it is considered as block level cache.
What changes were proposed in this PR?
1. Have set segment and schema details to BlockletIndexInputSplit object. While writing
minmax object, write byte size instead of position.
2. Creating BlockletIndex when bloom filter is used, so in createBlocklet step isBlockCache
is set to false.
This closes #4089
---
.../java/org/apache/carbondata/core/index/IndexUtil.java | 1 +
.../carbondata/core/indexstore/ExtendedBlocklet.java | 9 ++++++++-
.../spark/testsuite/index/CGIndexTestCase.scala | 15 +++++++++++----
3 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 56ee810..c663b78 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -257,6 +257,7 @@ public class IndexUtil {
for (ExtendedBlocklet blocklet : prunedBlocklet) {
blocklet.getDetailInfo();
blocklet.setIndexUniqueId(wrapper.getUniqueId());
+ blocklet.setCgIndexPresent(true);
}
extendedBlocklets.addAll(prunedBlocklet);
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index 0d2ba79..900d7c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -48,6 +48,8 @@ public class ExtendedBlocklet extends Blocklet {
private String segmentNo;
+ private boolean isCgIndexPresent = false;
+
public ExtendedBlocklet() {
}
@@ -191,7 +193,8 @@ public class ExtendedBlocklet extends Blocklet {
DataOutputStream dos = new DataOutputStream(ebos);
inputSplit.setFilePath(null);
inputSplit.setBucketId(null);
- if (inputSplit.isBlockCache()) {
+ // serialize detail info when it is blocklet cache or cgIndex is present.
+ if (inputSplit.isBlockCache() && !isCgIndexPresent) {
inputSplit.updateFooterOffset();
inputSplit.updateBlockLength();
inputSplit.setWriteDetailInfo(false);
@@ -247,4 +250,8 @@ public class ExtendedBlocklet extends Blocklet {
new CarbonInputSplit(serializeLen, in, getFilePath(), locations, getBlockletId());
}
}
+
+ public void setCgIndexPresent(boolean cgIndexPresent) {
+ isCgIndexPresent = cgIndexPresent;
+ }
}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
index a94d7ca..2bf46aa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.spark.testsuite.index
import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.index.{IndexInputSplit, IndexMeta, Segment}
import org.apache.carbondata.core.index.dev.{IndexBuilder, IndexModel, IndexWriter}
import org.apache.carbondata.core.index.dev.cgindex.{CoarseGrainIndex, CoarseGrainIndexFactory}
+import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -48,6 +50,7 @@ import org.apache.carbondata.core.scan.filter.executer.FilterExecutor
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.Event
import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest
@@ -112,8 +115,12 @@ class CGIndexFactory(
val files = file.listFiles()
files.map { f =>
- val d: IndexInputSplit = new BlockletIndexInputSplit(f.getCanonicalPath)
- d
+ val d: BlockletIndexInputSplit = new BlockletIndexInputSplit(f.getCanonicalPath)
+ d.setSegment(segment)
+ d.setIndexSchema(getIndexSchema)
+ d.setSegmentPath(CarbonTablePath.getSegmentPath(identifier.getTablePath,
+ segment.getSegmentNo))
+ new IndexInputSplitWrapper(UUID.randomUUID.toString, d).getDistributable
}.toList.asJava
}
@@ -351,8 +358,8 @@ class CGIndexWriter(
outStream.writeObject(maxMin)
outStream.close()
val bytes = compressor.compressByte(out.getBytes)
- stream.write(bytes.array(), 0, bytes.position())
- stream.writeInt(bytes.position())
+ stream.write(bytes.array(), 0, bytes.limit())
+ stream.writeInt(bytes.limit())
stream.close()
}