You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:42 UTC

[17/50] [abbrv] carbondata git commit: [CARBONDATA-2433][LUCENE]close the lucene index reader after every task and clean the resource and other functional issues

[CARBONDATA-2433][LUCENE]close the lucene index reader after every task and clean the resource and other functional issues

problem:

Lucene IndexReader opened during query is never closed. Which will impact performance and will lead to memory issues
Lucene index will not index the stop words like is, the etc. which may lead to wrong data with filter queries with text match
Solution:

Close the index reader once the task is completed.
Make it configurable to index the stop words in lucene indexing

This closes #2269


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/07a77fab
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/07a77fab
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/07a77fab

Branch: refs/heads/spark-2.3
Commit: 07a77fab7f24cffe72f42a38327479ea18a08c67
Parents: 77a1110
Author: akashrn5 <ak...@gmail.com>
Authored: Fri May 4 10:29:12 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 22 16:28:49 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  7 ++++++
 .../core/datamap/AbstractDataMapJob.java        |  2 +-
 .../datamap/DistributableDataMapFormat.java     | 18 ++++++++++++---
 .../carbondata/core/datamap/TableDataMap.java   | 18 ++++++++++++---
 .../carbondata/core/datamap/dev/DataMap.java    |  5 +++++
 .../blockletindex/BlockletDataMap.java          |  5 +++++
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  5 +++++
 .../datamap/examples/MinMaxIndexDataMap.java    |  5 +++++
 .../lucene/LuceneCoarseGrainDataMap.java        |  5 +++++
 .../datamap/lucene/LuceneDataMapWriter.java     | 20 ++++++++++++++---
 .../datamap/lucene/LuceneFineGrainDataMap.java  | 23 +++++++++++++++++---
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 23 ++++++++++++++++++--
 .../testsuite/datamap/CGDataMapTestCase.scala   |  7 ++++++
 .../testsuite/datamap/FGDataMapTestCase.scala   |  7 ++++++
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |  3 +++
 15 files changed, 138 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5ba1fec..8ebce9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1742,6 +1742,13 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512";
 
+  // by default lucene will not store or create index for stop words like "is","the", if this
+  // property is set to true lucene will index for stop words also and gives result for the filter
+  // with stop words(example: TEXT_MATCH('description':'the'))
+  public static final String CARBON_LUCENE_INDEX_STOP_WORDS = "carbon.lucene.index.stop.words";
+
+  public static final String CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
index 7d1cb48..ed3ecc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
@@ -30,4 +30,4 @@ public abstract class AbstractDataMapJob implements DataMapJob {
   @Override public void execute(CarbonTable carbonTable,
       FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 4200414..010c6c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -107,6 +108,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
     return new RecordReader<Void, ExtendedBlocklet>() {
       private Iterator<ExtendedBlocklet> blockletIterator;
       private ExtendedBlocklet currBlocklet;
+      private List<DataMap> dataMaps;
 
       @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
@@ -124,8 +126,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
           blockletIterator = Collections.emptyIterator();
           return;
         }
-        List<ExtendedBlocklet> blocklets = tableDataMap.prune(distributable.getDistributable(),
-            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
+        dataMaps = tableDataMap.getTableDataMaps(distributable.getDistributable());
+        List<ExtendedBlocklet> blocklets = tableDataMap
+            .prune(dataMaps,
+                distributable.getDistributable(),
+                dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
         for (ExtendedBlocklet blocklet : blocklets) {
           blocklet.setDataMapUniqueId(distributable.getUniqueId());
         }
@@ -137,6 +142,9 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
         boolean hasNext = blockletIterator.hasNext();
         if (hasNext) {
           currBlocklet = blockletIterator.next();
+        } else {
+          // close all resources when all the results are returned
+          close();
         }
         return hasNext;
       }
@@ -158,7 +166,11 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
       @Override
       public void close() throws IOException {
-
+        if (null != dataMaps) {
+          for (DataMap dataMap : dataMaps) {
+            dataMap.finish();
+          }
+        }
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index b8254d4..4ce0f6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -139,6 +139,17 @@ public final class TableDataMap extends OperationEventListener {
   }
 
   /**
+   * This method returns all the datamaps corresponding to the distributable object
+   *
+   * @param distributable
+   * @return
+   * @throws IOException
+   */
+  public List<DataMap> getTableDataMaps(DataMapDistributable distributable) throws IOException {
+    return dataMapFactory.getDataMaps(distributable);
+  }
+
+  /**
    * This method is used from any machine after it is distributed. It takes the distributable object
    * to prune the filters.
    *
@@ -146,11 +157,10 @@ public final class TableDataMap extends OperationEventListener {
    * @param filterExp
    * @return
    */
-  public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+  public List<ExtendedBlocklet> prune(List<DataMap> dataMaps, DataMapDistributable distributable,
       FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
-    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
     for (DataMap dataMap : dataMaps) {
       blocklets.addAll(dataMap.prune(filterExp,
           segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
@@ -192,7 +202,9 @@ public final class TableDataMap extends OperationEventListener {
    * Clears all datamap
    */
   public void clear() {
-    dataMapFactory.clear();
+    if (null != dataMapFactory) {
+      dataMapFactory.clear();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 9fbdd90..d846281 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -55,4 +55,9 @@ public interface DataMap<T extends Blocklet> {
    */
   void clear();
 
+  /**
+   * clears all the resources for datamaps
+   */
+  void finish();
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 6730ad5..6e43fbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -705,6 +705,11 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
     return prune(filterExp, this.segmentProperties);
   }
 
+  @Override
+  public void finish() {
+
+  }
+
   private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
     boolean needToScan = false;
     if (spec.getUuid() != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index a5a141c..e9af0ff 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -205,4 +205,9 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       return sb.toString();
     }
   }
+
+  @Override
+  public void finish() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index ac6358e..78868a9 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -170,4 +170,9 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
     readMinMaxDataMap = null;
   }
 
+  @Override
+  public void finish() {
+
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
index 580f18b..77b5347 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
@@ -230,4 +230,9 @@ public class LuceneCoarseGrainDataMap extends CoarseGrainDataMap {
   public void clear() {
 
   }
+
+  @Override
+  public void finish() {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 759b607..c7eb3d8 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.CharArraySet;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
 import org.apache.lucene.codecs.lucene62.Lucene62Codec;
 import org.apache.lucene.document.Document;
@@ -85,6 +87,11 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   public static final String ROWID_NAME = "rowId";
 
+  private Codec speedCodec = new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED);
+
+  private Codec compressionCodec =
+      new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION);
+
   private Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache = new HashMap<>();
 
   private int cacheSize;
@@ -123,7 +130,14 @@ public class LuceneDataMapWriter extends DataMapWriter {
    */
   public void onBlockletStart(int blockletId) throws IOException {
     if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
+      if (CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS,
+              CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT)
+          .equalsIgnoreCase("true")) {
+        analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET);
+      } else {
+        analyzer = new StandardAnalyzer();
+      }
     }
     // save index data into ram, write into disk after one page finished
     ramDir = new RAMDirectory();
@@ -162,10 +176,10 @@ public class LuceneDataMapWriter extends DataMapWriter {
         .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
             CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
         .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) {
-      indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+      indexWriterConfig.setCodec(speedCodec);
     } else {
       indexWriterConfig
-          .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+          .setCodec(compressionCodec);
     }
 
     indexWriter = new IndexWriter(indexDir, indexWriterConfig);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index 742f8d0..b26ab53 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -92,6 +92,8 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
   private boolean storeBlockletWise;
 
+  private IndexReader indexReader;
+
   LuceneFineGrainDataMap(Analyzer analyzer, DataMapSchema schema) {
     this.analyzer = analyzer;
     writeCacheSize = LuceneDataMapFactoryBase.validateAndGetWriteCacheSize(schema);
@@ -148,7 +150,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     // open this index path , use HDFS default configuration
     Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
 
-    IndexReader indexReader = DirectoryReader.open(indexDir);
+    this.indexReader = DirectoryReader.open(indexDir);
     if (indexReader == null) {
       throw new RuntimeException("failed to create index reader object");
     }
@@ -247,7 +249,6 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       // take the min of total documents available in the reader and limit if set by the user
       maxDocs = Math.min(maxDocs, indexSearcher.getIndexReader().maxDoc());
       // execute index search
-      // initialize to null, else ScoreDoc objects will get accumulated in memory
       TopDocs result = null;
       // the number of documents to be queried in one search. It will always be minimum of
       // search result and maxDocs
@@ -423,4 +424,20 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
   }
 
-}
\ No newline at end of file
+  @Override
+  public void finish() {
+    if (null != indexReader) {
+      try {
+        int referenceCount = indexReader.getRefCount();
+        if (referenceCount > 0) {
+          indexReader.decRef();
+          if (null != indexSearcherMap) {
+            indexSearcherMap.clear();
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error(e, "Ignoring the exception, Error while closing the lucene index reader");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 0e885de..638d24d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -117,7 +117,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene fine grain data map") {
-    sql("drop datamap if exists dm on table datamap_test")
+//    sql("drop datamap if exists dm on table datamap_test")
     sql(
       s"""
          | CREATE DATAMAP dm ON TABLE datamap_test
@@ -799,6 +799,21 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists dm_text on table datamap_test_table")
   }
 
+  test("test lucene indexing english stop words") {
+    sql("drop table if exists table_stop")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS, "false")
+    sql("create table table_stop(suggestion string,goal string) stored by 'carbondata'")
+    sql(
+      "create datamap stop_dm on table table_stop using 'lucene' DMPROPERTIES('index_columns'='suggestion')")
+    sql("insert into table_stop select 'The is the stop word','abcde'")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS, "true")
+    sql("insert into table_stop select 'The is one more stop word','defg'")
+    assert(
+      sql("select * from table_stop where text_match('suggestion:*is*')").collect().length == 1)
+  }
+
   override protected def afterAll(): Unit = {
     LuceneFineGrainDataMapSuite.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")
@@ -813,11 +828,15 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_test5")
     sql("DROP TABLE IF EXISTS datamap_test7")
     sql("DROP TABLE IF EXISTS datamap_main")
+    sql("DROP TABLE IF EXISTS table_stop")
     sql("use default")
     sql("drop database if exists lucene cascade")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
-          CarbonProperties.getStorePath)
+        CarbonProperties.getStorePath)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS,
+        CarbonCommonConstants.CARBON_LUCENE_INDEX_STOP_WORDS_DEFAULT)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index d8fc46f..848acde 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -234,6 +234,13 @@ class CGDataMap extends CoarseGrainDataMap {
   }
 
   override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+
+  /**
+   * clears all the resources for datamaps
+   */
+  override def finish() = {
+    ???
+  }
 }
 
 class CGDataMapWriter(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 535a112..e2642ff 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -259,6 +259,13 @@ class FGDataMap extends FineGrainDataMap {
   }
 
   override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???
+
+  /**
+   * clears all the resources for datamaps
+   */
+  override def finish() = {
+
+  }
 }
 
 class FGDataMapWriter(carbonTable: CarbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/07a77fab/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 6ee566c..43ee31b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -71,6 +71,9 @@ class DataMapPruneRDD(sc: SparkContext,
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit
     val reader = dataMapFormat.createRecordReader(inputSplit, attemptContext)
     reader.initialize(inputSplit, attemptContext)
+    context.addTaskCompletionListener(_ => {
+      reader.close()
+    })
     val iter = new Iterator[ExtendedBlocklet] {
 
       private var havePair = false