You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/09/20 22:16:46 UTC
[1/2] incubator-carbondata git commit: fixed limit query issue
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 0391d4223 -> b04a579d5
fixed limit query issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5a3d5bb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5a3d5bb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5a3d5bb8
Branch: refs/heads/master
Commit: 5a3d5bb84c9f7689428f60d776e90523f6582319
Parents: 0391d42
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Sep 20 19:19:31 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Sep 21 03:43:21 2016 +0530
----------------------------------------------------------------------
.../carbondata/scan/executor/QueryExecutor.java | 7 ++++
.../scan/executor/QueryExecutorFactory.java | 3 +-
.../executor/impl/AbstractQueryExecutor.java | 14 ++++++++
.../scan/executor/impl/DetailQueryExecutor.java | 3 +-
.../executor/impl/QueryExecutorProperties.java | 5 +++
.../carbondata/scan/model/QueryModel.java | 4 +--
.../AbstractDetailQueryResultIterator.java | 30 +++++++---------
.../iterator/DetailQueryResultIterator.java | 15 +++-----
.../carbondata/hadoop/CarbonRecordReader.java | 14 ++++++--
.../spark/merger/CarbonCompactionExecutor.java | 36 +++++++++++++-------
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 ++++--
.../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +++--
12 files changed, 97 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
index 53bf8ca..1f67c84 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
@@ -37,4 +37,11 @@ public interface QueryExecutor<E> {
* @throws QueryExecutionException if any failure while executing the query
*/
CarbonIterator<E> execute(QueryModel queryModel) throws QueryExecutionException;
+
+ /**
+ * Below method will be used for cleanup
+ *
+ * @throws QueryExecutionException
+ */
+ void finish() throws QueryExecutionException;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
index ab75231..7ed6e60 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
@@ -19,7 +19,6 @@
package org.apache.carbondata.scan.executor;
import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
-import org.apache.carbondata.scan.model.QueryModel;
/**
* Factory class to get the query executor from RDD
@@ -27,7 +26,7 @@ import org.apache.carbondata.scan.model.QueryModel;
*/
public class QueryExecutorFactory {
- public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+ public static QueryExecutor getQueryExecutor() {
return new DetailQueryExecutor();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index c31824f..b015e49 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -19,6 +19,7 @@
package org.apache.carbondata.scan.executor.impl;
import java.util.*;
+import java.util.concurrent.Executors;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -85,6 +86,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
queryModel.getQueryId());
LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
.getCarbonTableIdentifier().getTableName());
+ // add executor service for query execution
+ queryProperties.executorService = Executors.newFixedThreadPool(1);
// Initializing statistics list to record the query statistics
// creating copy on write to handle concurrent scenario
queryProperties.queryStatisticsRecorder = new QueryStatisticsRecorder(queryModel.getQueryId());
@@ -426,4 +429,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
.toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
}
+ /**
+ * Below method will be used to finish the execution
+ *
+ * @throws QueryExecutionException
+ */
+ @Override public void finish() throws QueryExecutionException {
+ if (null != queryProperties.executorService) {
+ queryProperties.executorService.shutdownNow();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
index 716cdc7..f2f4b58 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
@@ -36,7 +36,8 @@ public class DetailQueryExecutor extends AbstractQueryExecutor {
@Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
throws QueryExecutionException {
List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
- return new DetailQueryResultIterator(blockExecutionInfoList, queryModel);
+ return new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+ queryProperties.executorService);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
index 2f21a96..7663738 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.scan.executor.impl;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
@@ -83,6 +84,10 @@ public class QueryExecutorProperties {
*/
public QueryStatisticsRecorder queryStatisticsRecorder;
/**
+ * executor service to execute the query
+ */
+ public ExecutorService executorService;
+ /**
* list of blocks in which query will be executed
*/
protected List<AbstractIndex> dataBlocks;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 1c819a2..10cbd25 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -199,8 +199,8 @@ public class QueryModel implements Serializable {
}
- public static void processFilterExpression(
- Expression filterExpression, List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+ public static void processFilterExpression(Expression filterExpression,
+ List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
if (null != filterExpression) {
if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
if (filterExpression instanceof ConditionalExpression) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 02505e8..22a4412 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -19,6 +19,7 @@
package org.apache.carbondata.scan.result.iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
@@ -51,41 +52,36 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
private static final LogService LOGGER =
LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName());
+ protected ExecutorService execService;
/**
* execution info of the block
*/
protected List<BlockExecutionInfo> blockExecutionInfos;
-
- /**
- * number of cores which can be used
- */
- private int batchSize;
-
/**
* file reader which will be used to execute the query
*/
protected FileHolder fileReader;
-
protected AbstractDataBlockIterator dataBlockIterator;
-
protected boolean nextBatch = false;
-
/**
* total time scan the blocks
*/
protected long totalScanTime;
-
/**
* is the statistic recorded
*/
protected boolean isStatisticsRecorded;
-
/**
- * QueryStatisticsRecorder
+ * QueryStatisticsRecorder
*/
protected QueryStatisticsRecorder recorder;
+ /**
+ * number of cores which can be used
+ */
+ private int batchSize;
- public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) {
+ public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+ ExecutorService execService) {
String batchSizeString =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
if (null != batchSizeString) {
@@ -102,16 +98,17 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
this.blockExecutionInfos = infos;
this.fileReader = FileFactory.getFileHolder(
FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
+ this.execService = execService;
intialiseInfos();
}
private void intialiseInfos() {
- totalScanTime=System.currentTimeMillis();
+ totalScanTime = System.currentTimeMillis();
for (BlockExecutionInfo blockInfo : blockExecutionInfos) {
DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
DataRefNode startDataBlock = finder
.findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- while(startDataBlock.nodeNumber()!= blockInfo.getStartBlockletIndex()) {
+ while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) {
startDataBlock = startDataBlock.getNextDataRefNode();
}
@@ -154,7 +151,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
}
private DataBlockIteratorImpl getDataBlockIterator() {
- if(blockExecutionInfos.size() > 0) {
+ if (blockExecutionInfos.size() > 0) {
BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
blockExecutionInfos.remove(executionInfo);
return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize);
@@ -162,5 +159,4 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
index 0013c0a..a958195 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
@@ -21,9 +21,7 @@ package org.apache.carbondata.scan.result.iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
@@ -37,14 +35,12 @@ import org.apache.carbondata.scan.result.BatchResult;
*/
public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator {
- private ExecutorService execService = Executors.newFixedThreadPool(1);
-
- private Future<BatchResult> future;
-
private final Object lock = new Object();
+ private Future<BatchResult> future;
- public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) {
- super(infos, queryModel);
+ public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+ ExecutorService execService) {
+ super(infos, queryModel, execService);
}
@Override public BatchResult next() {
@@ -60,13 +56,10 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
nextBatch = true;
future = execute();
} else {
- execService.shutdown();
- execService.awaitTermination(1, TimeUnit.HOURS);
fileReader.finish();
}
totalScanTime += System.currentTimeMillis() - startTime;
} catch (Exception ex) {
- execService.shutdown();
fileReader.finish();
throw new RuntimeException(ex);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index fd0a438..443922c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.scan.executor.QueryExecutor;
import org.apache.carbondata.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.scan.model.QueryModel;
@@ -50,9 +51,12 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
private CarbonIterator<Object[]> carbonIterator;
+ private QueryExecutor queryExecutor;
+
public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
this.queryModel = queryModel;
this.readSupport = readSupport;
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
}
@Override public void initialize(InputSplit split, TaskAttemptContext context)
@@ -69,9 +73,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
readSupport
.intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
try {
- carbonIterator = new ChunkRowIterator(
- (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
- .execute(queryModel));
+ carbonIterator =
+ new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
}
@@ -105,5 +108,10 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
}
// close read support
readSupport.close();
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ throw new IOException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
index ac6b697..6a2c839 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
@@ -52,22 +52,21 @@ import org.apache.carbondata.scan.result.iterator.RawResultIterator;
*/
public class CarbonCompactionExecutor {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
- private QueryExecutor queryExecutor;
private final SegmentProperties destinationSegProperties;
private final String databaseName;
private final String factTableName;
private final Map<String, TaskBlockInfo> segmentMapping;
private final String storePath;
+ private QueryExecutor queryExecutor;
private CarbonTable carbonTable;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-
private QueryModel queryModel;
/**
* Constructor
+ *
* @param segmentMapping
* @param segmentProperties
* @param databaseName
@@ -97,6 +96,7 @@ public class CarbonCompactionExecutor {
/**
* For processing of the table blocks.
+ *
* @return List of Carbon iterators
*/
public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
@@ -113,10 +113,8 @@ public class CarbonCompactionExecutor {
int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
- SegmentProperties sourceSegProperties = new SegmentProperties(
- listMetadata.get(0).getColumnInTable(),
- colCardinality
- );
+ SegmentProperties sourceSegProperties =
+ new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
// for each segment get taskblock info
TaskBlockInfo taskBlockInfo = taskMap.getValue();
@@ -128,7 +126,7 @@ public class CarbonCompactionExecutor {
Collections.sort(list);
LOGGER.info("for task -" + task + "-block size is -" + list.size());
queryModel.setTableBlockInfos(list);
- resultList.add(new RawResultIterator( executeBlockList(list),sourceSegProperties,
+ resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
destinationSegProperties));
}
@@ -147,7 +145,7 @@ public class CarbonCompactionExecutor {
throws QueryExecutionException {
queryModel.setTableBlockInfos(blockList);
- this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+ this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
CarbonIterator<BatchResult> iter = null;
try {
iter = queryExecutor.execute(queryModel);
@@ -160,10 +158,23 @@ public class CarbonCompactionExecutor {
}
/**
+ * Below method will be used
+ * for cleanup
+ */
+ public void finish() {
+ try {
+ queryExecutor.finish();
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e, "Problem while finish: ");
+ }
+ clearDictionaryFromQueryModel();
+ }
+
+ /**
* This method will clear the dictionary access count after its usage is complete so
* that column can be deleted form LRU cache whenever memory reaches threshold
*/
- public void clearDictionaryFromQueryModel() {
+ private void clearDictionaryFromQueryModel() {
if (null != queryModel) {
Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
if (null != columnToDictionaryMapping) {
@@ -176,6 +187,7 @@ public class CarbonCompactionExecutor {
/**
* Preparing of the query model.
+ *
* @param blockList
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f7f6949..1f016ba 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -98,6 +98,7 @@ class CarbonMergerRDD[K, V](
LOGGER.info("Temp storeLocation taken is " + storeLocation)
var mergeStatus = false
var mergeNumber = ""
+ var exec: CarbonCompactionExecutor = null
try {
var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
@@ -121,7 +122,7 @@ class CarbonMergerRDD[K, V](
carbonLoadModel.setStorePath(hdfsStoreLocation)
- val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+ exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
dataFileMetadataSegMapping
)
@@ -132,7 +133,9 @@ class CarbonMergerRDD[K, V](
result2 = exec.processTableBlocks()
} catch {
case e: Throwable =>
- exec.clearDictionaryFromQueryModel
+ if (null != exec) {
+ exec.finish
+ }
LOGGER.error(e)
if (null != e.getMessage) {
sys.error("Exception occurred in query execution :: " + e.getMessage)
@@ -140,7 +143,6 @@ class CarbonMergerRDD[K, V](
sys.error("Exception occurred in query execution.Please check logs.")
}
}
-
mergeNumber = mergedLoadName
.substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
@@ -184,6 +186,9 @@ class CarbonMergerRDD[K, V](
case e: Exception =>
LOGGER.error(e)
}
+ if (null != exec) {
+ exec.finish
+ }
}
var finished = false
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e676687..e8915c4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryS
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.scan.executor.QueryExecutor
import org.apache.carbondata.scan.executor.QueryExecutorFactory
import org.apache.carbondata.scan.expression.Expression
import org.apache.carbondata.scan.model.QueryModel
@@ -47,6 +48,8 @@ import org.apache.carbondata.spark.RawValue
import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.util.QueryPlanUtil
+
+
class CarbonSparkPartition(rddId: Int, val idx: Int,
val locations: Array[String],
val tableBlockInfos: util.List[TableBlockInfo])
@@ -186,10 +189,12 @@ class CarbonScanRDD[V: ClassTag](
val iter = new Iterator[V] {
var rowIterator: CarbonIterator[Array[Any]] = _
var queryStartTime: Long = 0
+ val queryExecutor = QueryExecutorFactory.getQueryExecutor()
try {
context.addTaskCompletionListener(context => {
clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
logStatistics()
+ queryExecutor.finish
})
val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
@@ -197,7 +202,6 @@ class CarbonScanRDD[V: ClassTag](
// fill table block info
queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
queryStartTime = System.currentTimeMillis
-
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
logInfo("*************************" + carbonPropertiesFilePath)
if (null == carbonPropertiesFilePath) {
@@ -206,7 +210,7 @@ class CarbonScanRDD[V: ClassTag](
}
// execute query
rowIterator = new ChunkRowIterator(
- QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
+ queryExecutor.execute(queryModel).
asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
}
[2/2] incubator-carbondata git commit: [CARBONDATA-262] Fixed limit
query memory and thread leak issue. This closes #182
Posted by gv...@apache.org.
[CARBONDATA-262] Fixed limit query memory and thread leak issue. This closes #182
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b04a579d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b04a579d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b04a579d
Branch: refs/heads/master
Commit: b04a579d5def6213808cc9eef103f7208b843a2b
Parents: 0391d42 5a3d5bb
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Wed Sep 21 03:46:29 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Sep 21 03:46:29 2016 +0530
----------------------------------------------------------------------
.../carbondata/scan/executor/QueryExecutor.java | 7 ++++
.../scan/executor/QueryExecutorFactory.java | 3 +-
.../executor/impl/AbstractQueryExecutor.java | 14 ++++++++
.../scan/executor/impl/DetailQueryExecutor.java | 3 +-
.../executor/impl/QueryExecutorProperties.java | 5 +++
.../carbondata/scan/model/QueryModel.java | 4 +--
.../AbstractDetailQueryResultIterator.java | 30 +++++++---------
.../iterator/DetailQueryResultIterator.java | 15 +++-----
.../carbondata/hadoop/CarbonRecordReader.java | 14 ++++++--
.../spark/merger/CarbonCompactionExecutor.java | 36 +++++++++++++-------
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 ++++--
.../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +++--
12 files changed, 97 insertions(+), 53 deletions(-)
----------------------------------------------------------------------