You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:41:02 UTC
[37/50] [abbrv] carbondata git commit: [CARBONDATA-2389] Search mode
support FG datamap
[CARBONDATA-2389] Search mode support FG datamap
Search mode support FG datamap
This closes #2290
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b3384593
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b3384593
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b3384593
Branch: refs/heads/spark-2.3
Commit: b3384593640bd941054d37ddc364181785b994d2
Parents: 74770aa
Author: xubo245 <60...@qq.com>
Authored: Wed May 9 21:20:59 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed May 30 23:53:55 2018 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 12 +
.../core/datamap/DataMapStoreManager.java | 20 +-
.../apache/carbondata/core/datamap/Segment.java | 2 +-
.../datamap/dev/expr/AndDataMapExprWrapper.java | 16 +
.../datamap/dev/expr/DataMapExprWrapper.java | 13 +
.../dev/expr/DataMapExprWrapperImpl.java | 8 +
.../datamap/dev/expr/OrDataMapExprWrapper.java | 13 +
.../LatestFilesReadCommittedScope.java | 43 ++-
.../core/readcommitter/ReadCommittedScope.java | 2 +-
.../TableStatusReadCommittedScope.java | 2 +-
.../lucene/LuceneDataMapFactoryBase.java | 4 +-
.../examples/LuceneDataMapExample.scala | 2 -
.../carbondata/hadoop/CarbonRecordReader.java | 8 +-
.../hadoop/api/CarbonInputFormat.java | 6 +-
.../lucene/LuceneFineGrainDataMapSuite.scala | 1 +
...eneFineGrainDataMapWithSearchModeSuite.scala | 328 +++++++++++++++++++
.../detailquery/SearchModeTestCase.scala | 27 ++
.../execution/command/CarbonHiveCommands.scala | 4 +-
.../spark/sql/optimizer/CarbonFilters.scala | 2 +
.../store/worker/SearchRequestHandler.java | 37 ++-
.../scala/org/apache/spark/rpc/Master.scala | 13 +-
21 files changed, 521 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8ebce9e..08aa704 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1658,6 +1658,18 @@ public final class CarbonCommonConstants {
public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
/**
+ * It's timeout threshold of carbon search query
+ */
+ @CarbonProperty
+ @InterfaceStability.Unstable
+ public static final String CARBON_SEARCH_QUERY_TIMEOUT = "carbon.search.query.timeout";
+
+ /**
+ * Default value is 10 seconds
+ */
+ public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s";
+
+ /**
* The size of thread pool used for reading files in Work for search mode. By default,
* it is number of cores in Worker
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 0fcf4cd..96d2b1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -96,13 +96,19 @@ public final class DataMapStoreManager {
String dbName = carbonTable.getDatabaseName();
String tableName = carbonTable.getTableName();
String dmName = dataMap.getDataMapSchema().getDataMapName();
- boolean isDmVisible = sessionInfo.getSessionParams().getProperty(
- String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE,
- dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true");
- if (!isDmVisible) {
- LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s",
- dmName, dbName, tableName));
- dataMapIterator.remove();
+ // TODO: need support get the visible status of datamap without sessionInfo in the future
+ if (sessionInfo != null) {
+ boolean isDmVisible = sessionInfo.getSessionParams().getProperty(
+ String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE,
+ dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true");
+ if (!isDmVisible) {
+ LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s",
+ dmName, dbName, tableName));
+ dataMapIterator.remove();
+ }
+ } else {
+ String message = "Carbon session info is null";
+ LOGGER.info(message);
}
}
return allDataMaps;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 85c7176..7b63b84 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -115,7 +115,7 @@ public class Segment implements Serializable {
public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo)
throws IOException {
- return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo);
+ return readCommittedScope.getCommittedSegmentRefreshInfo(this, updateVo);
}
public String getSegmentNo() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 1de16bc..ec674de 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -59,6 +60,21 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
return andBlocklets;
}
+ @Override
+ public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+ List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune);
+ List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune);
+ List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+ for (ExtendedBlocklet blocklet : leftPrune) {
+ if (rightPrune.contains(blocklet)) {
+ andBlocklets.add(blocklet);
+ }
+ }
+ return andBlocklets;
+ }
+
@Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
throws IOException {
List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index 5a04529..901cfc7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -41,6 +42,18 @@ public interface DataMapExprWrapper extends Serializable {
throws IOException;
/**
+ * prune blocklet according distributable
+ *
+ * @param distributable distributable
+ * @param partitionsToPrune partitions to prune
+ * @return the pruned ExtendedBlocklet list
+ * @throws IOException
+ */
+ List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+ List<PartitionSpec> partitionsToPrune)
+ throws IOException;
+
+ /**
* It is used in case on distributable datamap. First using job it gets all blockets from all
* related datamaps. These blocklets are passed to this method to apply expression.
* @param blocklets
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index 38f2336..6537976 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -52,6 +53,13 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
return dataMap.prune(segments, expression, partitionsToPrune);
}
+ public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+ List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ List<DataMap> dataMaps = dataMap.getTableDataMaps(distributable);
+ return dataMap.prune(dataMaps, distributable, expression, partitionsToPrune);
+ }
+
@Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
throws IOException {
List<ExtendedBlocklet> blockletList = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 4988903..bb98535 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -58,6 +59,18 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
return new ArrayList<>(andBlocklets);
}
+ @Override
+ public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+ List<PartitionSpec> partitionsToPrune)
+ throws IOException {
+ List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune);
+ List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune);
+ Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+ andBlocklets.addAll(leftPrune);
+ andBlocklets.addAll(rightPrune);
+ return new ArrayList<>(andBlocklets);
+ }
+
@Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
throws IOException {
List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 14bba65..6a1234e 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -17,10 +17,7 @@
package org.apache.carbondata.core.readcommitter;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -43,11 +40,20 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
public class LatestFilesReadCommittedScope implements ReadCommittedScope {
private String carbonFilePath;
+ private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
- public LatestFilesReadCommittedScope(String path) {
+ /**
+ * a new constructor of this class
+ *
+ * @param path carbon file path
+ * @param segmentId segment id
+ */
+ public LatestFilesReadCommittedScope(String path, String segmentId) {
+ Objects.requireNonNull(path);
this.carbonFilePath = path;
+ this.segmentId = segmentId;
try {
takeCarbonIndexFileSnapShot();
} catch (IOException ex) {
@@ -55,6 +61,15 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
}
}
+ /**
+ * a new constructor with path
+ *
+ * @param path carbon file path
+ */
+ public LatestFilesReadCommittedScope(String path) {
+ this(path, null);
+ }
+
private void prepareLoadMetadata() {
int loadCount = 0;
Map<String, List<String>> snapshotMap =
@@ -101,13 +116,16 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
segName = segment.getSegmentFileName();
}
List<String> index = snapShot.get(segName);
+ if (null == index) {
+ index = new LinkedList<>();
+ }
for (String indexPath : index) {
indexFileStore.put(indexPath, null);
}
return indexFileStore;
}
- @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+ @Override public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
Segment segment, UpdateVO updateVo) throws IOException {
Map<String, SegmentRefreshInfo> snapShot =
readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
@@ -140,9 +158,10 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
// Read the current file Path get the list of indexes from the path.
CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
CarbonFile[] files = file.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile file) {
+ @Override
+ public boolean accept(CarbonFile file) {
return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
- .endsWith(CarbonTablePath.CARBON_DATA_EXT);
+ .endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName().endsWith("Fact");
}
});
if (files.length == 0) {
@@ -152,8 +171,14 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
}
Map<String, List<String>> indexFileStore = new HashMap<>();
Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
+ CarbonFile[] carbonIndexFiles = null;
if (file.isDirectory()) {
- CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+ if (segmentId == null) {
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+ } else {
+ String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+ }
for (int i = 0; i < carbonIndexFiles.length; i++) {
// TODO. If Required to support merge index, then this code has to be modified.
// TODO. Nested File Paths.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index 6ff4b89..d177a00 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -45,7 +45,7 @@ public interface ReadCommittedScope extends Serializable {
*/
public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ;
- public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+ public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
Segment segment, UpdateVO updateVo) throws IOException;
public void takeCarbonIndexFileSnapShot() throws IOException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 91ebd41..1f61aab 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -79,7 +79,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
return indexFiles;
}
- public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
+ public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
throws IOException {
SegmentRefreshInfo segmentRefreshInfo;
if (updateVo != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index fab0565..1da8edd 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandExcept
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
@@ -235,7 +236,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
}
for (CarbonFile indexDir : indexDirs) {
// Filter out the tasks which are filtered through CG datamap.
- if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
+ if (getDataMapLevel() != DataMapLevel.FG &&
+ !segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
continue;
}
DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
index efe2a63..fe94f54 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
@@ -61,8 +61,6 @@ object LuceneDataMapExample {
| DMProperties('INDEX_COLUMNS'='id , name')
""".stripMargin)
- spark.sql("refresh datamap dm ON TABLE personTable")
-
// 1. Compare the performance:
def time(code: => Unit): Double = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index cad20fc..da84c00 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -78,8 +78,12 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
} else {
throw new RuntimeException("unsupported input split type: " + inputSplit);
}
- List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
- queryModel.setTableBlockInfos(tableBlockInfoList);
+ // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty
+ // otherwise the prune is no use before this method
+ if (queryModel.getTableBlockInfos().isEmpty()) {
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ }
readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
try {
carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index cf51162..05c70f8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -372,7 +372,7 @@ m filterExpression
List<ExtendedBlocklet> prunedBlocklets =
getPrunedBlocklets(job, carbonTable, resolver, segmentIds);
- List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
int partitionIndex = 0;
List<Integer> partitionIdList = new ArrayList<>();
if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
@@ -401,7 +401,7 @@ m filterExpression
if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
if (inputSplit != null) {
- resultFilterredBlocks.add(inputSplit);
+ resultFilteredBlocks.add(inputSplit);
}
}
}
@@ -409,7 +409,7 @@ m filterExpression
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
- return resultFilterredBlocks;
+ return resultFilteredBlocks;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 638d24d..f64a349 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -438,6 +438,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
.contains("Unsupported alter operation on hive table"))
sql("drop datamap if exists dm2 on table datamap_test_table")
}
+
test("test Clean Files and check Lucene DataMap") {
sql("DROP TABLE IF EXISTS datamap_test_table")
sql(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
new file mode 100644
index 0000000..0ceead8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+
+/**
+ * Test lucene fine grain datamap with search mode
+ */
+class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAndAfterAll {
+
+ val file2 = resourcesPath + "/datamap_input.csv"
+
+ override protected def beforeAll(): Unit = {
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 500000
+ sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+ CarbonProperties
+ .getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+ LuceneFineGrainDataMapSuite.createFile(file2, n)
+ sql("create database if not exists lucene")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+ CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+ sql("use lucene")
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql(
+ """
+ | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ }
+
+ test("test lucene fine grain data map with search mode") {
+
+ sqlContext.sparkSession.sparkContext.setLogLevel("WARN")
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='Name')
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"),
+ sql(s"select * from datamap_test where name='n10'"))
+
+ sql("drop datamap dm on table datamap_test")
+ }
+
+ // TODO: optimize performance
+ ignore("test lucene fine grain data map with TEXT_MATCH 'AND' Filter") {
+ sql("drop datamap if exists dm on table datamap_test")
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='name')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(
+ sql("SELECT count(*) FROM datamap_test WHERE TEXT_MATCH('name:n2*') " +
+ "AND age=28 and id=200149"),
+ sql("SELECT count(*) FROM datamap_test WHERE name like 'n2%' " +
+ "AND age=28 and id=200149"))
+ sql("drop datamap if exists dm on table datamap_test")
+ }
+
+ // TODO: optimize performance
+ ignore("test lucene fine grain data map with TEXT_MATCH 'AND' and 'OR' Filter ") {
+ sql("drop datamap if exists dm on table datamap_test")
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='name , city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+ checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n1*') OR TEXT_MATCH ('city:c01*') " +
+ "AND TEXT_MATCH('city:C02*')"),
+ sql("select * from datamap_test where name like 'n1%' OR city like 'c01%' and city like" +
+ " 'c02%'"))
+ sql("drop datamap if exists dm on table datamap_test")
+ }
+
+ test("test lucene fine grain data map with compaction-Major ") {
+ sql("DROP TABLE IF EXISTS datamap_test_table")
+ sql(
+ """
+ | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test_table
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='name , city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+ checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"),
+ sql("select * from datamap_test_table where name='n10'"))
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+ sql("alter table datamap_test_table compact 'major'")
+ checkAnswer(sql("SELECT COUNT(*) FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"),
+ sql("select COUNT(*) from datamap_test_table where name='n10'"))
+ sql("drop datamap if exists dm on table datamap_test_table")
+ sql("DROP TABLE IF EXISTS datamap_test_table")
+ }
+
+ test("test lucene fine grain datamap rebuild") {
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ sql(
+ """
+ | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test5
+ | USING 'lucene'
+ | WITH DEFERRED REBUILD
+ | DMProperties('INDEX_COLUMNS'='city')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+ val map = DataMapStatusManager.readDataMapStatusMap()
+ assert(!map.get("dm").isEnabled)
+ sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ }
+
+ test("test lucene fine grain datamap rebuild with table block size") {
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ sql(
+ """
+ | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test5
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='Name , cIty')
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c00')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c00'"))
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c0100085')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c0100085'"))
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c09560')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c09560'"))
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ }
+
+ test("test lucene fine grain multiple data map on table") {
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ sql(
+ """
+ | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm2 ON TABLE datamap_test5
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='city')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm1 ON TABLE datamap_test5
+ | USING 'lucene'
+ | DMProperties('INDEX_COLUMNS'='Name')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('name:n10')"),
+ sql(s"select * from datamap_test5 where name='n10'"))
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ }
+
+ // TODO: support it in the future
+ ignore("test lucene datamap and validate the visible and invisible status of datamap ") {
+ val tableName = "datamap_test2"
+ val dataMapName1 = "ggdatamap1";
+ sql(s"DROP TABLE IF EXISTS $tableName")
+ sql(
+ s"""
+ | CREATE TABLE $tableName(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+ """.stripMargin)
+ // register datamap writer
+ sql(
+ s"""
+ | CREATE DATAMAP ggdatamap1 ON TABLE $tableName
+ | USING 'lucene'
+ | DMPROPERTIES('index_columns'='name')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE $tableName OPTIONS('header'='false')")
+
+ val df1 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").collect()
+ sql(s"SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").show()
+ println(df1(0).getString(0))
+ assertResult(
+ s"""== CarbonData Profiler ==
+ |Table Scan on datamap_test2
+ | - total blocklets: 1
+ | - filter: TEXT_MATCH('name:n502670')
+ | - pruned by Main DataMap
+ | - skipped blocklets: 0
+ | - pruned by FG DataMap
+ | - name: ggdatamap1
+ | - provider: lucene
+ | - skipped blocklets: 1
+ |""".stripMargin)(df1(0).getString(0))
+
+ sql(s"set ${CarbonCommonConstants.CARBON_DATAMAP_VISIBLE}default.$tableName.$dataMapName1 = false")
+
+ val df2 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670'").collect()
+ println(df2(0).getString(0))
+ assertResult(
+ s"""== CarbonData Profiler ==
+ |Table Scan on $tableName
+ | - total blocklets: 1
+ | - filter: (name <> null and name = n502670)
+ | - pruned by Main DataMap
+ | - skipped blocklets: 0
+ |""".stripMargin)(df2(0).getString(0))
+
+ checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"),
+ sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"))
+ sql(s"DROP TABLE IF EXISTS $tableName")
+ }
+
+ ignore("test lucene fine grain datamap rebuild with table block size, rebuild") {
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ sql(
+ """
+ | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1')
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP dm ON TABLE datamap_test5
+ | USING 'lucene'
+ | WITH DEFERRED REBUILD
+ | DMProperties('INDEX_COLUMNS'='Name , cIty')
+ """.stripMargin)
+
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+ sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+
+ sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+ sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show()
+ sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+ sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show()
+ checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+ sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ }
+
+ override protected def afterAll(): Unit = {
+ LuceneFineGrainDataMapSuite.deleteFile(file2)
+ sql("DROP TABLE IF EXISTS datamap_test")
+ sql("DROP TABLE IF EXISTS datamap_test5")
+ sql("use default")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+ CarbonProperties.getStorePath)
+ sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+ }
+
+ def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+ val write = new PrintWriter(new File(fileName))
+ for (i <- start until (start + line)) {
+ write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
+ }
+ write.close()
+ }
+
+ def deleteFile(fileName: String): Unit = {
+ val file = new File(fileName)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 2c94dab..d278fc5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -109,4 +109,31 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
sql("set carbon.search.enabled = false")
assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
}
+
+ test("test lucene datamap with search mode") {
+ sql("DROP DATAMAP IF EXISTS dm ON TABLE main")
+ sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ")
+ checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
+ sql(s"SELECT * FROM main WHERE id='100000'"))
+ sql("DROP DATAMAP if exists dm ON TABLE main")
+ }
+
+ test("test lucene datamap with search mode 2") {
+ sql("drop datamap if exists dm3 ON TABLE main")
+ sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ")
+ checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
+ sql("SELECT * FROM main WHERE city='city6'"))
+ sql("DROP DATAMAP if exists dm3 ON TABLE main")
+ }
+
+ test("test lucene datamap with search mode, two column") {
+ sql("drop datamap if exists dm3 ON TABLE main")
+ sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ")
+ checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
+ sql("SELECT * FROM main WHERE city='city6'"))
+ checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
+ sql(s"SELECT * FROM main WHERE id='100000'"))
+ sql("DROP DATAMAP if exists dm3 ON TABLE main")
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 29dcec9..186e39e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -68,10 +68,10 @@ case class CarbonSetCommand(command: SetCommand)
override val output: Seq[Attribute] = command.output
override def run(sparkSession: SparkSession): Seq[Row] = {
- val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
+ val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
command.kv match {
case Some((key, Some(value))) =>
- CarbonSetCommand.validateAndSetValue(sessionParms, key, value)
+ CarbonSetCommand.validateAndSetValue(sessionParams, key, value)
// handle search mode start/stop for ThriftServer usage
if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 07a444f..c052cd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -382,6 +382,8 @@ object CarbonFilters {
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
new AndExpression(l, r)
case StringTrim(child) => transformExpression(child)
+ case s: ScalaUDF =>
+ new MatchExpression(s.children.head.toString())
case _ =>
new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 9727352..f6406c7 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.store.worker;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -27,7 +28,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.row.CarbonRow;
@@ -112,6 +115,8 @@ public class SearchRequestHandler {
queryModel.setVectorReader(false);
CarbonMultiBlockSplit mbSplit = request.split().value();
+ List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
+ queryModel.setTableBlockInfos(list);
long limit = request.limit();
long rowCount = 0;
@@ -158,22 +163,38 @@ public class SearchRequestHandler {
CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
Objects.requireNonNull(datamap);
List<Segment> segments = new LinkedList<>();
+ HashMap<String, Integer> uniqueSegments = new HashMap<>();
for (CarbonInputSplit split : mbSplit.getAllSplits()) {
- segments.add(
- Segment.toSegment(split.getSegmentId(),
- new LatestFilesReadCommittedScope(table.getTablePath())));
+ String segmentId = split.getSegmentId();
+ if (uniqueSegments.get(segmentId) == null) {
+ segments.add(Segment.toSegment(
+ segmentId,
+ new LatestFilesReadCommittedScope(table.getTablePath(), segmentId)));
+ uniqueSegments.put(segmentId, 1);
+ } else {
+ uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
+ }
+ }
+
+ List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
+ List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
+ for (int i = 0; i < distributables.size(); i++) {
+ DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
+ prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
}
- List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null);
- List<String> pathToRead = new LinkedList<>();
- for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
- pathToRead.add(prunnedBlocklet.getPath());
+ HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
+ for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
+ pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
}
List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
List<TableBlockInfo> blockToRead = new LinkedList<>();
for (TableBlockInfo block : blocks) {
- if (pathToRead.contains(block.getFilePath())) {
+ if (pathToRead.keySet().contains(block.getFilePath())) {
+ // If not set this, it will can't create FineGrainBlocklet object in
+ // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
+ block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
blockToRead.add(block);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3384593/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 26de74c..f48f5e4 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -38,8 +38,7 @@ import org.apache.spark.util.ThreadUtils
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapChooser
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -232,10 +231,14 @@ class Master(sparkConf: SparkConf) {
// if we have enough data already, we do not need to collect more result
if (rowCount < globalLimit) {
- // wait for worker for 10s
- ThreadUtils.awaitResult(future, Duration.apply("10s"))
+ // wait for worker
+ val timeout = CarbonProperties
+ .getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
+ CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
+ ThreadUtils.awaitResult(future, Duration.apply(timeout))
LOG.info(s"[SearchId:$queryId] receive search response from worker " +
- s"${worker.address}:${worker.port}")
+ s"${worker.address}:${worker.port}")
try {
future.value match {
case Some(response: Try[SearchResult]) =>