You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/05/19 00:08:09 UTC
carbondata git commit: [HOTFIX] [CARBONDATA-2480] Search mode
RuntimeException: Error while resolving filter expression
Repository: carbondata
Updated Branches:
refs/heads/master 3087323a9 -> 784b22de8
[HOTFIX] [CARBONDATA-2480] Search mode RuntimeException: Error while resolving filter expression
Invoke chooseFGDataMap in Worker in search mode
This closes #2309
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/784b22de
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/784b22de
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/784b22de
Branch: refs/heads/master
Commit: 784b22de89e80b43f34550a70511bbb83b0c72c7
Parents: 3087323
Author: xubo245 <60...@qq.com>
Authored: Wed May 9 20:11:01 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat May 19 08:07:11 2018 +0800
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 4 ++--
.../lucene/LuceneCoarseGrainDataMapFactory.java | 2 +-
.../lucene/LuceneFineGrainDataMapFactory.java | 2 +-
.../dataload/TestBatchSortDataLoad.scala | 2 +-
.../detailquery/SearchModeTestCase.scala | 4 +++-
.../TestDataLoadingForPartitionTable.scala | 2 +-
.../store/worker/SearchRequestHandler.java | 21 ++++++++++++++++++--
.../scala/org/apache/spark/rpc/Master.scala | 11 +---------
.../org/apache/spark/search/Searcher.scala | 3 +--
9 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9d648f5..8528d6f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -478,12 +478,12 @@ public class CarbonTable implements Serializable {
/**
* Return the segment path of the specified segmentId
*/
- public String getSemgentPath(String segmentId) {
+ public String getSegmentPath(String segmentId) {
return CarbonTablePath.getSegmentPath(getTablePath(), segmentId);
}
/**
- * @return storepath
+ * @return store path
*/
public String getTablePath() {
return tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index dca5c90..b9c2ffa 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
/**
- * FG level of lucene DataMap
+ * CG level of lucene DataMap
*/
@InterfaceAudience.Internal
public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<CoarseGrainDataMap> {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ec9283d..2d9618c 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
/**
- * CG level of lucene DataMap
+ * FG level of lucene DataMap
*/
@InterfaceAudience.Internal
public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 68a3058..f3e12d1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -193,7 +193,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.DATABASE_DEFAULT_NAME,
tableName
)
- val segmentDir = carbonTable.getSemgentPath(segmentNo)
+ val segmentDir = carbonTable.getSegmentPath(segmentNo)
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 2f3488e..2c94dab 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.spark.testsuite.detailquery
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonSession, Row, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.DataGenerator
@@ -98,6 +99,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
test("aggregate query with datamap and fallback to SparkSQL") {
sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
checkSearchAnswer("select city, count(*) from main group by city")
+ sql("drop datamap preagg on table main").show()
}
test("set search mode") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index e5de8da..0eaaec5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -61,7 +61,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
- val segmentDir = carbonTable.getSemgentPath(segmentId)
+ val segmentDir = carbonTable.getSegmentPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 35acb17..445b292 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -26,6 +26,7 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapChooser;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.scan.model.QueryModelBuilder;
import org.apache.carbondata.core.util.CarbonTaskInfo;
@@ -81,6 +83,19 @@ public class SearchRequestHandler {
return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
}
+ private DataMapExprWrapper chooseFGDataMap(
+ CarbonTable table,
+ FilterResolverIntf filterInterface) {
+ DataMapChooser chooser = null;
+ try {
+ chooser = new DataMapChooser(table);
+ return chooser.chooseFGDataMap(filterInterface);
+ } catch (IOException e) {
+ LOG.audit(e.getMessage());
+ return null;
+ }
+ }
+
/**
* Builds {@link QueryModel} and read data from files
*/
@@ -102,10 +117,12 @@ public class SearchRequestHandler {
LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+ DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
+ queryModel.getFilterExpressionResolverTree());
// If there is DataMap selected in Master, prune the split by it
- if (request.dataMap() != null) {
- queryModel = prune(request.searchId(), table, queryModel, mbSplit, request.dataMap().get());
+ if (fgDataMap != null) {
+ queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
}
// In search mode, reader will read multiple blocks by using a thread pool
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 2e9a532..26de74c 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -215,13 +215,12 @@ class Master(sparkConf: SparkConf) {
// prune data and get a mapping of worker hostname to list of blocks,
// then add these blocks to the SearchRequest and fire the RPC call
val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
- val fgDataMap = chooseFGDataMap(table, filter)
val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
// Build a SearchRequest
val split = new SerializableWritable[CarbonMultiBlockSplit](
new CarbonMultiBlockSplit(blocks, splitAddress))
val request =
- SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit, fgDataMap)
+ SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
// Find an Endpoind and send the request to it
// This RPC is non-blocking so that we do not need to wait before send to next worker
@@ -254,14 +253,6 @@ class Master(sparkConf: SparkConf) {
output.toArray
}
- private def chooseFGDataMap(
- table: CarbonTable,
- filter: Expression): Option[DataMapExprWrapper] = {
- val chooser = new DataMapChooser(table)
- val filterInterface = table.resolveFilter(filter)
- Option(chooser.chooseFGDataMap(filterInterface))
- }
-
/**
* Prune data by using CarbonInputFormat.getSplit
* Return a mapping of host address to list of block
http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index 1532284..6fbea15 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -60,8 +60,7 @@ case class SearchRequest(
tableInfo: TableInfo,
projectColumns: Array[String],
filterExpression: Expression,
- limit: Long,
- dataMap: Option[DataMapExprWrapper])
+ limit: Long)
// Search result sent from worker to master
case class SearchResult(