You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:01:28 UTC
[07/50] [abbrv] carbondata git commit: [CARBONDATA-2099] Refactor
query scan process to improve readability
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index f51ced3..6a401d8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
/**
* Executor class for executing the query on the selected segments to be merged.
@@ -70,6 +66,9 @@ public class CarbonCompactionExecutor {
*/
private boolean restructuredBlockExists;
+ // converter for UTF8String and decimal conversion
+ private DataTypeConverter dataTypeConverter;
+
/**
* Constructor
*
@@ -82,13 +81,14 @@ public class CarbonCompactionExecutor {
public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
SegmentProperties segmentProperties, CarbonTable carbonTable,
Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
- boolean restructuredBlockExists) {
+ boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) {
this.segmentMapping = segmentMapping;
this.destinationSegProperties = segmentProperties;
this.carbonTable = carbonTable;
this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
this.restructuredBlockExists = restructuredBlockExists;
- queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ this.dataTypeConverter = dataTypeConverter;
}
/**
@@ -100,7 +100,9 @@ public class CarbonCompactionExecutor {
List<RawResultIterator> resultList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<TableBlockInfo> list = null;
- queryModel = prepareQueryModel(list);
+ queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter);
+ queryModel.setReadPageByPage(enablePageLevelReaderForCompaction());
+ queryModel.setForcedDetailRawQuery(true);
// iterate each seg ID
for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
String segmentId = taskMap.getKey();
@@ -156,7 +158,7 @@ public class CarbonCompactionExecutor {
* @param blockList
* @return
*/
- private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+ private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
throws QueryExecutionException, IOException {
queryModel.setTableBlockInfos(blockList);
QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -195,48 +197,6 @@ public class CarbonCompactionExecutor {
}
/**
- * Preparing of the query model.
- *
- * @param blockList
- * @return
- */
- private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
- QueryModel model = new QueryModel();
- model.setTableBlockInfos(blockList);
- model.setForcedDetailRawQuery(true);
- model.setFilterExpressionResolverTree(null);
- model.setConverter(DataTypeUtil.getDataTypeConverter());
- model.setReadPageByPage(enablePageLevelReaderForCompaction());
-
- List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getTableName());
- for (CarbonDimension dim : dimensions) {
- // check if dimension is deleted
- QueryDimension queryDimension = new QueryDimension(dim.getColName());
- queryDimension.setDimension(dim);
- dims.add(queryDimension);
- }
- model.setQueryDimension(dims);
-
- List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getTableName());
- for (CarbonMeasure carbonMeasure : measures) {
- // check if measure is deleted
- QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
- queryMeasure.setMeasure(carbonMeasure);
- msrs.add(queryMeasure);
- }
- model.setQueryMeasures(msrs);
- model.setQueryId(System.nanoTime() + "");
- model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
- model.setTable(carbonTable);
- return model;
- }
-
- /**
* Whether to enable page level reader for compaction or not.
*/
private boolean enablePageLevelReaderForCompaction() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
index 79e9e5a..b6f12a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.processing.partition.DataPartitioner;
import org.apache.carbondata.processing.partition.Partition;
@@ -46,9 +45,8 @@ public final class QueryPartitionHelper {
/**
* Get partitions applicable for query based on filters applied in query
*/
- public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
- String tableUniqueName =
- CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName());
+ public List<Partition> getPartitionsForQuery(String databaseName, String tableName) {
+ String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
index 36e022b..01db4f6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -18,7 +18,6 @@
package org.apache.carbondata.processing.partition.spliter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.RowBatch;
import org.apache.carbondata.core.util.CarbonUtil;
public abstract class AbstractCarbonQueryExecutor {
@@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor {
LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
protected CarbonTable carbonTable;
protected QueryModel queryModel;
- protected QueryExecutor queryExecutor;
- protected Map<String, TaskBlockInfo> segmentMapping;
+ private QueryExecutor queryExecutor;
+ Map<String, TaskBlockInfo> segmentMapping;
/**
* get executor and execute the query model.
@@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor {
* @param blockList
* @return
*/
- protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+ CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList)
throws QueryExecutionException, IOException {
queryModel.setTableBlockInfos(blockList);
this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
@@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor {
}
/**
- * Preparing of the query model.
- *
- * @param blockList
- * @return
- */
- protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
- QueryModel model = new QueryModel();
- model.setTableBlockInfos(blockList);
- model.setForcedDetailRawQuery(true);
- model.setFilterExpressionResolverTree(null);
-
- List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getTableName());
- for (CarbonDimension dim : dimensions) {
- // check if dimension is deleted
- QueryDimension queryDimension = new QueryDimension(dim.getColName());
- queryDimension.setDimension(dim);
- dims.add(queryDimension);
- }
- model.setQueryDimension(dims);
-
- List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getTableName());
- for (CarbonMeasure carbonMeasure : measures) {
- // check if measure is deleted
- QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
- queryMeasure.setMeasure(carbonMeasure);
- msrs.add(queryMeasure);
- }
- model.setQueryMeasures(msrs);
- model.setQueryId(System.nanoTime() + "");
- model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
- model.setTable(carbonTable);
- return model;
- }
-
- /**
* Below method will be used
* for cleanup
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index 6afec0b..b18207d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
/**
* Used to read carbon blocks when add/split partition
@@ -48,7 +49,8 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
throws QueryExecutionException, IOException {
List<TableBlockInfo> list = null;
- queryModel = prepareQueryModel(list);
+ queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl());
+ queryModel.setForcedDetailRawQuery(true);
List<PartitionSpliterRawResultIterator> resultList
= new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
index ec91472..4abdf3c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryProjection;
import org.apache.carbondata.processing.partition.Partition;
import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
@@ -46,7 +46,7 @@ public class CarbonQueryUtil {
* It creates the one split for each region server.
*/
public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
- CarbonQueryPlan queryPlan) {
+ QueryProjection queryPlan) {
//Just create splits depends on locations of region servers
List<Partition> allPartitions = null;
@@ -55,7 +55,7 @@ public class CarbonQueryUtil {
QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
} else {
allPartitions =
- QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+ QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName);
}
TableSplit[] splits = new TableSplit[allPartitions.size()];
for (int i = 0; i < splits.length; i++) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 5e0fce5..578f0cd 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -157,7 +157,7 @@ class StreamHandoffRDD[K, V](
CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
val format = new CarbonTableInputFormat[Array[Object]]()
- val model = format.getQueryModel(inputSplit, attemptContext)
+ val model = format.createQueryModel(inputSplit, attemptContext)
val inputFormat = new CarbonStreamInputFormat
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[CarbonStreamRecordReader]