You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/09/20 22:16:46 UTC

[1/2] incubator-carbondata git commit: fixed limit query issue

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 0391d4223 -> b04a579d5


fixed limit query issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5a3d5bb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5a3d5bb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5a3d5bb8

Branch: refs/heads/master
Commit: 5a3d5bb84c9f7689428f60d776e90523f6582319
Parents: 0391d42
Author: kumarvishal <ku...@gmail.com>
Authored: Tue Sep 20 19:19:31 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Sep 21 03:43:21 2016 +0530

----------------------------------------------------------------------
 .../carbondata/scan/executor/QueryExecutor.java |  7 ++++
 .../scan/executor/QueryExecutorFactory.java     |  3 +-
 .../executor/impl/AbstractQueryExecutor.java    | 14 ++++++++
 .../scan/executor/impl/DetailQueryExecutor.java |  3 +-
 .../executor/impl/QueryExecutorProperties.java  |  5 +++
 .../carbondata/scan/model/QueryModel.java       |  4 +--
 .../AbstractDetailQueryResultIterator.java      | 30 +++++++---------
 .../iterator/DetailQueryResultIterator.java     | 15 +++-----
 .../carbondata/hadoop/CarbonRecordReader.java   | 14 ++++++--
 .../spark/merger/CarbonCompactionExecutor.java  | 36 +++++++++++++-------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 11 ++++--
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  8 +++--
 12 files changed, 97 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
index 53bf8ca..1f67c84 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutor.java
@@ -37,4 +37,11 @@ public interface QueryExecutor<E> {
    * @throws QueryExecutionException if any failure while executing the query
    */
   CarbonIterator<E> execute(QueryModel queryModel) throws QueryExecutionException;
+
+  /**
+   * Below method will be used for cleanup
+   *
+   * @throws QueryExecutionException
+   */
+  void finish() throws QueryExecutionException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
index ab75231..7ed6e60 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
@@ -19,7 +19,6 @@
 package org.apache.carbondata.scan.executor;
 
 import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
-import org.apache.carbondata.scan.model.QueryModel;
 
 /**
  * Factory class to get the query executor from RDD
@@ -27,7 +26,7 @@ import org.apache.carbondata.scan.model.QueryModel;
  */
 public class QueryExecutorFactory {
 
-  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+  public static QueryExecutor getQueryExecutor() {
     return new DetailQueryExecutor();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index c31824f..b015e49 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.carbondata.scan.executor.impl;
 
 import java.util.*;
+import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -85,6 +86,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         queryModel.getQueryId());
     LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
         .getCarbonTableIdentifier().getTableName());
+    // add executor service for query execution
+    queryProperties.executorService = Executors.newFixedThreadPool(1);
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder = new QueryStatisticsRecorder(queryModel.getQueryId());
@@ -426,4 +429,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .toPrimitive(parentBlockIndexList.toArray(new Integer[parentBlockIndexList.size()]));
   }
 
+  /**
+   * Below method will be used to finish the execution
+   *
+   * @throws QueryExecutionException
+   */
+  @Override public void finish() throws QueryExecutionException {
+    if (null != queryProperties.executorService) {
+      queryProperties.executorService.shutdownNow();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
index 716cdc7..f2f4b58 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/DetailQueryExecutor.java
@@ -36,7 +36,8 @@ public class DetailQueryExecutor extends AbstractQueryExecutor {
   @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
       throws QueryExecutionException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
-    return new DetailQueryResultIterator(blockExecutionInfoList, queryModel);
+    return new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+        queryProperties.executorService);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
index 2f21a96..7663738 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.scan.executor.impl;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
@@ -83,6 +84,10 @@ public class QueryExecutorProperties {
    */
   public QueryStatisticsRecorder queryStatisticsRecorder;
   /**
+   * executor service to execute the query
+   */
+  public ExecutorService executorService;
+  /**
    * list of blocks in which query will be executed
    */
   protected List<AbstractIndex> dataBlocks;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index 1c819a2..10cbd25 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -199,8 +199,8 @@ public class QueryModel implements Serializable {
 
   }
 
-  public static void processFilterExpression(
-      Expression filterExpression, List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+  public static void processFilterExpression(Expression filterExpression,
+      List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
     if (null != filterExpression) {
       if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
         if (filterExpression instanceof ConditionalExpression) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 02505e8..22a4412 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -19,6 +19,7 @@
 package org.apache.carbondata.scan.result.iterator;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -51,41 +52,36 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName());
 
+  protected ExecutorService execService;
   /**
    * execution info of the block
    */
   protected List<BlockExecutionInfo> blockExecutionInfos;
-
-  /**
-   * number of cores which can be used
-   */
-  private int batchSize;
-
   /**
    * file reader which will be used to execute the query
    */
   protected FileHolder fileReader;
-
   protected AbstractDataBlockIterator dataBlockIterator;
-
   protected boolean nextBatch = false;
-
   /**
    * total time scan the blocks
    */
   protected long totalScanTime;
-
   /**
    * is the statistic recorded
    */
   protected boolean isStatisticsRecorded;
-
   /**
-   *  QueryStatisticsRecorder
+   * QueryStatisticsRecorder
    */
   protected QueryStatisticsRecorder recorder;
+  /**
+   * number of cores which can be used
+   */
+  private int batchSize;
 
-  public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) {
+  public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+      ExecutorService execService) {
     String batchSizeString =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
     if (null != batchSizeString) {
@@ -102,16 +98,17 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
     this.blockExecutionInfos = infos;
     this.fileReader = FileFactory.getFileHolder(
         FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
+    this.execService = execService;
     intialiseInfos();
   }
 
   private void intialiseInfos() {
-    totalScanTime=System.currentTimeMillis();
+    totalScanTime = System.currentTimeMillis();
     for (BlockExecutionInfo blockInfo : blockExecutionInfos) {
       DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
       DataRefNode startDataBlock = finder
           .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
-      while(startDataBlock.nodeNumber()!= blockInfo.getStartBlockletIndex()) {
+      while (startDataBlock.nodeNumber() != blockInfo.getStartBlockletIndex()) {
         startDataBlock = startDataBlock.getNextDataRefNode();
       }
 
@@ -154,7 +151,7 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
   }
 
   private DataBlockIteratorImpl getDataBlockIterator() {
-    if(blockExecutionInfos.size() > 0) {
+    if (blockExecutionInfos.size() > 0) {
       BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
       blockExecutionInfos.remove(executionInfo);
       return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize);
@@ -162,5 +159,4 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
     return null;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
index 0013c0a..a958195 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/DetailQueryResultIterator.java
@@ -21,9 +21,7 @@ package org.apache.carbondata.scan.result.iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
@@ -37,14 +35,12 @@ import org.apache.carbondata.scan.result.BatchResult;
  */
 public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator {
 
-  private ExecutorService execService = Executors.newFixedThreadPool(1);
-
-  private Future<BatchResult> future;
-
   private final Object lock = new Object();
+  private Future<BatchResult> future;
 
-  public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) {
-    super(infos, queryModel);
+  public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
   }
 
   @Override public BatchResult next() {
@@ -60,13 +56,10 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
         nextBatch = true;
         future = execute();
       } else {
-        execService.shutdown();
-        execService.awaitTermination(1, TimeUnit.HOURS);
         fileReader.finish();
       }
       totalScanTime += System.currentTimeMillis() - startTime;
     } catch (Exception ex) {
-      execService.shutdown();
       fileReader.finish();
       throw new RuntimeException(ex);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index fd0a438..443922c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.scan.executor.QueryExecutor;
 import org.apache.carbondata.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.model.QueryModel;
@@ -50,9 +51,12 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
 
   private CarbonIterator<Object[]> carbonIterator;
 
+  private QueryExecutor queryExecutor;
+
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
     this.queryModel = queryModel;
     this.readSupport = readSupport;
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
   }
 
   @Override public void initialize(InputSplit split, TaskAttemptContext context)
@@ -69,9 +73,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
     readSupport
         .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
     try {
-      carbonIterator = new ChunkRowIterator(
-          (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
-              .execute(queryModel));
+      carbonIterator =
+          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
     } catch (QueryExecutionException e) {
       throw new InterruptedException(e.getMessage());
     }
@@ -105,5 +108,10 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
     }
     // close read support
     readSupport.close();
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
index ac6b697..6a2c839 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/CarbonCompactionExecutor.java
@@ -52,22 +52,21 @@ import org.apache.carbondata.scan.result.iterator.RawResultIterator;
  */
 public class CarbonCompactionExecutor {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
   private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
-  private QueryExecutor queryExecutor;
   private final SegmentProperties destinationSegProperties;
   private final String databaseName;
   private final String factTableName;
   private final Map<String, TaskBlockInfo> segmentMapping;
   private final String storePath;
+  private QueryExecutor queryExecutor;
   private CarbonTable carbonTable;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
-
   private QueryModel queryModel;
 
   /**
    * Constructor
+   *
    * @param segmentMapping
    * @param segmentProperties
    * @param databaseName
@@ -97,6 +96,7 @@ public class CarbonCompactionExecutor {
 
   /**
    * For processing of the table blocks.
+   *
    * @return List of Carbon iterators
    */
   public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
@@ -113,10 +113,8 @@ public class CarbonCompactionExecutor {
 
       int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
 
-      SegmentProperties sourceSegProperties = new SegmentProperties(
-          listMetadata.get(0).getColumnInTable(),
-          colCardinality
-      );
+      SegmentProperties sourceSegProperties =
+          new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
 
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
@@ -128,7 +126,7 @@ public class CarbonCompactionExecutor {
         Collections.sort(list);
         LOGGER.info("for task -" + task + "-block size is -" + list.size());
         queryModel.setTableBlockInfos(list);
-        resultList.add(new RawResultIterator( executeBlockList(list),sourceSegProperties,
+        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
             destinationSegProperties));
 
       }
@@ -147,7 +145,7 @@ public class CarbonCompactionExecutor {
       throws QueryExecutionException {
 
     queryModel.setTableBlockInfos(blockList);
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
     CarbonIterator<BatchResult> iter = null;
     try {
       iter = queryExecutor.execute(queryModel);
@@ -160,10 +158,23 @@ public class CarbonCompactionExecutor {
   }
 
   /**
+   * Below method will be used
+   * for cleanup
+   */
+  public void finish() {
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e, "Problem while finish: ");
+    }
+    clearDictionaryFromQueryModel();
+  }
+
+  /**
    * This method will clear the dictionary access count after its usage is complete so
    * that column can be deleted form LRU cache whenever memory reaches threshold
    */
-  public void clearDictionaryFromQueryModel() {
+  private void clearDictionaryFromQueryModel() {
     if (null != queryModel) {
       Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
       if (null != columnToDictionaryMapping) {
@@ -176,6 +187,7 @@ public class CarbonCompactionExecutor {
 
   /**
    * Preparing of the query model.
+   *
    * @param blockList
    * @return
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f7f6949..1f016ba 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -98,6 +98,7 @@ class CarbonMergerRDD[K, V](
       LOGGER.info("Temp storeLocation taken is " + storeLocation)
       var mergeStatus = false
       var mergeNumber = ""
+      var exec: CarbonCompactionExecutor = null
       try {
         var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
@@ -121,7 +122,7 @@ class CarbonMergerRDD[K, V](
 
         carbonLoadModel.setStorePath(hdfsStoreLocation)
 
-        val exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
+          exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
           factTableName, hdfsStoreLocation, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
           dataFileMetadataSegMapping
         )
@@ -132,7 +133,9 @@ class CarbonMergerRDD[K, V](
           result2 = exec.processTableBlocks()
         } catch {
           case e: Throwable =>
-            exec.clearDictionaryFromQueryModel
+            if (null != exec) {
+              exec.finish
+            }
             LOGGER.error(e)
             if (null != e.getMessage) {
               sys.error("Exception occurred in query execution :: " + e.getMessage)
@@ -140,7 +143,6 @@ class CarbonMergerRDD[K, V](
               sys.error("Exception occurred in query execution.Please check logs.")
             }
         }
-
         mergeNumber = mergedLoadName
           .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
             CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
@@ -184,6 +186,9 @@ class CarbonMergerRDD[K, V](
           case e: Exception =>
             LOGGER.error(e)
         }
+       if (null != exec) {
+         exec.finish
+       }
       }
 
       var finished = false

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5a3d5bb8/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e676687..e8915c4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryS
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.scan.executor.QueryExecutor
 import org.apache.carbondata.scan.executor.QueryExecutorFactory
 import org.apache.carbondata.scan.expression.Expression
 import org.apache.carbondata.scan.model.QueryModel
@@ -47,6 +48,8 @@ import org.apache.carbondata.spark.RawValue
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.QueryPlanUtil
 
+
+
 class CarbonSparkPartition(rddId: Int, val idx: Int,
     val locations: Array[String],
     val tableBlockInfos: util.List[TableBlockInfo])
@@ -186,10 +189,12 @@ class CarbonScanRDD[V: ClassTag](
     val iter = new Iterator[V] {
       var rowIterator: CarbonIterator[Array[Any]] = _
       var queryStartTime: Long = 0
+      val queryExecutor = QueryExecutorFactory.getQueryExecutor()
       try {
         context.addTaskCompletionListener(context => {
           clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
           logStatistics()
+          queryExecutor.finish
         })
         val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
         if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
@@ -197,7 +202,6 @@ class CarbonScanRDD[V: ClassTag](
           // fill table block info
           queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
           queryStartTime = System.currentTimeMillis
-
           val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
           logInfo("*************************" + carbonPropertiesFilePath)
           if (null == carbonPropertiesFilePath) {
@@ -206,7 +210,7 @@ class CarbonScanRDD[V: ClassTag](
           }
           // execute query
           rowIterator = new ChunkRowIterator(
-            QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
+            queryExecutor.execute(queryModel).
               asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
 
         }


[2/2] incubator-carbondata git commit: [CARBONDATA-262] Fixed limit query memory and thread leak issue. This closes #182

Posted by gv...@apache.org.
[CARBONDATA-262] Fixed limit query memory and thread leak issue. This closes #182


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b04a579d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b04a579d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b04a579d

Branch: refs/heads/master
Commit: b04a579d5def6213808cc9eef103f7208b843a2b
Parents: 0391d42 5a3d5bb
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Wed Sep 21 03:46:29 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Sep 21 03:46:29 2016 +0530

----------------------------------------------------------------------
 .../carbondata/scan/executor/QueryExecutor.java |  7 ++++
 .../scan/executor/QueryExecutorFactory.java     |  3 +-
 .../executor/impl/AbstractQueryExecutor.java    | 14 ++++++++
 .../scan/executor/impl/DetailQueryExecutor.java |  3 +-
 .../executor/impl/QueryExecutorProperties.java  |  5 +++
 .../carbondata/scan/model/QueryModel.java       |  4 +--
 .../AbstractDetailQueryResultIterator.java      | 30 +++++++---------
 .../iterator/DetailQueryResultIterator.java     | 15 +++-----
 .../carbondata/hadoop/CarbonRecordReader.java   | 14 ++++++--
 .../spark/merger/CarbonCompactionExecutor.java  | 36 +++++++++++++-------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 11 ++++--
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  8 +++--
 12 files changed, 97 insertions(+), 53 deletions(-)
----------------------------------------------------------------------