You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/05/19 00:08:09 UTC

carbondata git commit: [HOTFIX] [CARBONDATA-2480] Search mode RuntimeException: Error while resolving filter expression

Repository: carbondata
Updated Branches:
  refs/heads/master 3087323a9 -> 784b22de8


[HOTFIX] [CARBONDATA-2480] Search mode RuntimeException: Error while resolving filter expression

Invoke chooseFGDataMap in Worker in search mode

This closes #2309


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/784b22de
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/784b22de
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/784b22de

Branch: refs/heads/master
Commit: 784b22de89e80b43f34550a70511bbb83b0c72c7
Parents: 3087323
Author: xubo245 <60...@qq.com>
Authored: Wed May 9 20:11:01 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat May 19 08:07:11 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  4 ++--
 .../lucene/LuceneCoarseGrainDataMapFactory.java |  2 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  2 +-
 .../dataload/TestBatchSortDataLoad.scala        |  2 +-
 .../detailquery/SearchModeTestCase.scala        |  4 +++-
 .../TestDataLoadingForPartitionTable.scala      |  2 +-
 .../store/worker/SearchRequestHandler.java      | 21 ++++++++++++++++++--
 .../scala/org/apache/spark/rpc/Master.scala     | 11 +---------
 .../org/apache/spark/search/Searcher.scala      |  3 +--
 9 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9d648f5..8528d6f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -478,12 +478,12 @@ public class CarbonTable implements Serializable {
   /**
    * Return the segment path of the specified segmentId
    */
-  public String getSemgentPath(String segmentId) {
+  public String getSegmentPath(String segmentId) {
     return CarbonTablePath.getSegmentPath(getTablePath(), segmentId);
   }
 
   /**
-   * @return storepath
+   * @return store path
    */
   public String getTablePath() {
     return tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index dca5c90..b9c2ffa 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
- * FG level of lucene DataMap
+ * CG level of lucene DataMap
  */
 @InterfaceAudience.Internal
 public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<CoarseGrainDataMap> {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ec9283d..2d9618c 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 
 /**
- * CG level of lucene DataMap
+ * FG level of lucene DataMap
  */
 @InterfaceAudience.Internal
 public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 68a3058..f3e12d1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -193,7 +193,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       tableName
     )
-    val segmentDir = carbonTable.getSemgentPath(segmentNo)
+    val segmentDir = carbonTable.getSegmentPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 2f3488e..2c94dab 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -19,7 +19,8 @@ package org.apache.carbondata.spark.testsuite.detailquery
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonSession, Row, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.DataGenerator
@@ -98,6 +99,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
   test("aggregate query with datamap and fallback to SparkSQL") {
     sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
     checkSearchAnswer("select city, count(*) from main group by city")
+    sql("drop datamap preagg on table main").show()
   }
 
   test("set search mode") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index e5de8da..0eaaec5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -61,7 +61,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
 
   def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val segmentDir = carbonTable.getSemgentPath(segmentId)
+    val segmentDir = carbonTable.getSegmentPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 35acb17..445b292 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapChooser;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.util.CarbonTaskInfo;
@@ -81,6 +83,19 @@ public class SearchRequestHandler {
     return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
   }
 
+  private DataMapExprWrapper chooseFGDataMap(
+          CarbonTable table,
+          FilterResolverIntf filterInterface) {
+    DataMapChooser chooser = null;
+    try {
+      chooser = new DataMapChooser(table);
+      return chooser.chooseFGDataMap(filterInterface);
+    } catch (IOException e) {
+      LOG.audit(e.getMessage());
+      return null;
+    }
+  }
+
   /**
    * Builds {@link QueryModel} and read data from files
    */
@@ -102,10 +117,12 @@ public class SearchRequestHandler {
 
     LOG.info(String.format("[SearchId:%d] %s, number of block: %d",
         request.searchId(), queryModel.toString(), mbSplit.getAllSplits().size()));
+    DataMapExprWrapper fgDataMap = chooseFGDataMap(table,
+            queryModel.getFilterExpressionResolverTree());
 
     // If there is DataMap selected in Master, prune the split by it
-    if (request.dataMap() != null) {
-      queryModel = prune(request.searchId(), table, queryModel, mbSplit, request.dataMap().get());
+    if (fgDataMap != null) {
+      queryModel = prune(request.searchId(), table, queryModel, mbSplit, fgDataMap);
     }
 
     // In search mode, reader will read multiple blocks by using a thread pool

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 2e9a532..26de74c 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -215,13 +215,12 @@ class Master(sparkConf: SparkConf) {
     // prune data and get a mapping of worker hostname to list of blocks,
     // then add these blocks to the SearchRequest and fire the RPC call
     val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
-    val fgDataMap = chooseFGDataMap(table, filter)
     val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) =>
       // Build a SearchRequest
       val split = new SerializableWritable[CarbonMultiBlockSplit](
         new CarbonMultiBlockSplit(blocks, splitAddress))
       val request =
-        SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit, fgDataMap)
+        SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
 
       // Find an Endpoind and send the request to it
       // This RPC is non-blocking so that we do not need to wait before send to next worker
@@ -254,14 +253,6 @@ class Master(sparkConf: SparkConf) {
     output.toArray
   }
 
-  private def chooseFGDataMap(
-      table: CarbonTable,
-      filter: Expression): Option[DataMapExprWrapper] = {
-    val chooser = new DataMapChooser(table)
-    val filterInterface = table.resolveFilter(filter)
-    Option(chooser.chooseFGDataMap(filterInterface))
-  }
-
   /**
    * Prune data by using CarbonInputFormat.getSplit
    * Return a mapping of host address to list of block

http://git-wip-us.apache.org/repos/asf/carbondata/blob/784b22de/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
index 1532284..6fbea15 100644
--- a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -60,8 +60,7 @@ case class SearchRequest(
     tableInfo: TableInfo,
     projectColumns: Array[String],
     filterExpression: Expression,
-    limit: Long,
-    dataMap: Option[DataMapExprWrapper])
+    limit: Long)
 
 // Search result sent from worker to master
 case class SearchResult(