You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/28 06:29:01 UTC

carbondata git commit: [CARBONDATA-1504] Fixed refresh of segments in datamap for update and partition

Repository: carbondata
Updated Branches:
  refs/heads/master c4f312fa1 -> eb771f5ee


[CARBONDATA-1504] Fixed refresh of segments in datamap for update and partition

Currently datamap scans complete segment every time for query execution to get all the carbonindex files. It will be slow when data/number of files are big.
So this PR caches the content of segment and refreshes when any updation or store changes.

This closes #1377


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb771f5e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb771f5e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb771f5e

Branch: refs/heads/master
Commit: eb771f5ee6511288e3c53281f3fac02d6f69c66a
Parents: c4f312f
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Thu Sep 21 19:03:06 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Sep 28 14:28:44 2017 +0800

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |  89 +-
 .../blockletindex/BlockletDataMap.java          |  14 +-
 .../blockletindex/BlockletDataMapFactory.java   |   9 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  23 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   4 +-
 .../iud/UpdateCarbonTableTestCase.scala         | 804 +++++++++----------
 .../apache/spark/sql/hive/CarbonMetastore.scala |   6 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  11 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |   2 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   2 +-
 10 files changed, 540 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 9974920..f19e733 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.datamap;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,9 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 
 /**
  * It maintains all the DataMaps in it.
@@ -38,6 +42,8 @@ public final class DataMapStoreManager {
    */
   private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>();
 
+  private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();
+
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DataMapStoreManager.class.getName());
 
@@ -87,6 +93,8 @@ public final class DataMapStoreManager {
   public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
       String factoryClassName, String dataMapName) {
     String table = identifier.uniqueName();
+    // Just update the segmentRefreshMap with the table if not added.
+    getTableSegmentRefresher(identifier);
     List<TableDataMap> tableDataMaps = allDataMaps.get(table);
     if (tableDataMaps == null) {
       tableDataMaps = new ArrayList<>();
@@ -124,22 +132,22 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * Clear the datamap/datamaps of a mentioned datamap name and table from memory
-   * @param identifier
-   * @param dataMapName
+   * Clear the datamap/datamaps of a table from memory
+   * @param identifier Table identifier
    */
-  public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
+  public void clearDataMap(AbsoluteTableIdentifier identifier) {
     List<TableDataMap> tableDataMaps = allDataMaps.get(identifier.uniqueName());
+    segmentRefreshMap.remove(identifier.uniqueName());
     if (tableDataMaps != null) {
       int i = 0;
       for (TableDataMap tableDataMap: tableDataMaps) {
-        if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) {
+        if (tableDataMap != null) {
           tableDataMap.clear();
-          tableDataMaps.remove(i);
           break;
         }
         i++;
       }
+      allDataMaps.remove(identifier.uniqueName());
     }
   }
 
@@ -151,4 +159,73 @@ public final class DataMapStoreManager {
     return instance;
   }
 
+  /**
+   * Get the TableSegmentRefresher for the table. If not existed then add one and return.
+   */
+  public TableSegmentRefresher getTableSegmentRefresher(AbsoluteTableIdentifier identifier) {
+    String uniqueName = identifier.uniqueName();
+    if (segmentRefreshMap.get(uniqueName) == null) {
+      segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(identifier));
+    }
+    return segmentRefreshMap.get(uniqueName);
+  }
+
+  /**
+   * Keep track of the segment refresh time.
+   */
+  public static class TableSegmentRefresher {
+
+    // This map stores the latest segment refresh time.So in case of update/delete we check the
+    // time against this map.
+    private Map<String, Long> segmentRefreshTime = new HashMap<>();
+
+    // This map keeps the manual refresh entries from users. It is mainly used for partition
+    // altering.
+    private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
+
+    public TableSegmentRefresher(AbsoluteTableIdentifier identifier) {
+      SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(identifier);
+      SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
+      for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
+        UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
+        segmentRefreshTime.put(updateVO.getSegmentId(), updateVO.getCreatedOrUpdatedTimeStamp());
+      }
+    }
+
+    public boolean isRefreshNeeded(String segmentId, SegmentUpdateStatusManager statusManager) {
+      UpdateVO updateVO = statusManager.getInvalidTimestampRange(segmentId);
+      if (segmentRefreshTime.get(segmentId) == null) {
+        segmentRefreshTime.put(segmentId, updateVO.getCreatedOrUpdatedTimeStamp());
+        return true;
+      }
+      if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
+        manualSegmentRefresh.put(segmentId, false);
+        return true;
+      }
+      Long updateTimestamp = updateVO.getLatestUpdateTimestamp();
+      boolean isRefresh =
+          updateTimestamp != null && (updateTimestamp > segmentRefreshTime.get(segmentId));
+      if (isRefresh) {
+        segmentRefreshTime.remove(segmentId);
+      }
+      return isRefresh;
+    }
+
+    public void refreshSegments(List<String> segmentIds) {
+      for (String segmentId: segmentIds) {
+        manualSegmentRefresh.put(segmentId, true);
+      }
+    }
+
+    public boolean isRefreshNeeded(String segmentId) {
+      if (manualSegmentRefresh.get(segmentId) != null && manualSegmentRefresh.get(segmentId)) {
+        manualSegmentRefresh.put(segmentId, false);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index bd7a1d3..57211fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -429,9 +429,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public void clear() {
-    unsafeMemoryDMStore.freeMemory();
-    unsafeMemoryDMStore = null;
-    segmentProperties = null;
+    if (unsafeMemoryDMStore != null) {
+      unsafeMemoryDMStore.freeMemory();
+      unsafeMemoryDMStore = null;
+      segmentProperties = null;
+    }
   }
 
   @Override
@@ -446,7 +448,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public long getMemorySize() {
-    return unsafeMemoryDMStore.getMemoryUsed();
+    if (unsafeMemoryDMStore != null) {
+      return unsafeMemoryDMStore.getMemoryUsed();
+    } else {
+      return 0;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index d6ddfa6..d734d81 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -77,6 +77,7 @@ public class BlockletDataMapFactory implements DataMapFactory {
         tableBlockIndexUniqueIdentifiers.add(
             new TableBlockIndexUniqueIdentifier(identifier, segmentId, listFiles[i].getName()));
       }
+      segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
     }
 
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -120,15 +121,17 @@ public class BlockletDataMapFactory implements DataMapFactory {
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         DataMap dataMap = cache.getIfPresent(blockIndex);
-        dataMap.clear();
-        cache.invalidate(blockIndex);
+        if (dataMap != null) {
+          cache.invalidate(blockIndex);
+          dataMap.clear();
+        }
       }
     }
   }
 
   @Override
   public void clear() {
-    for (String segmentId: segmentMap.keySet()) {
+    for (String segmentId: segmentMap.keySet().toArray(new String[segmentMap.size()])) {
       clear(segmentId);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 9076233..55d22d1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -272,16 +272,15 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
     // get all valid segments and set them into the configuration
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     if (validSegments.size() == 0) {
       SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
       SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
           segmentStatusManager.getValidAndInvalidSegments();
-      SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
       validSegments = segments.getValidSegments();
       if (validSegments.size() == 0) {
         return new ArrayList<>(0);
       }
-
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
       for (String invalidSegmentId : invalidSegments) {
@@ -292,6 +291,26 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       }
     }
 
+    // Clean the updated segments from memory if the update happens on segments
+    List<String> toBeCleanedSegments = new ArrayList<>();
+    for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
+        .getUpdateStatusDetails()) {
+      boolean refreshNeeded =
+          DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+              .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager);
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(segmentUpdateDetail.getSegmentName());
+      }
+    }
+    // Clean segments if refresh is needed
+    for (String segment : validSegments) {
+      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+          .isRefreshNeeded(segment)) {
+        toBeCleanedSegments.add(segment);
+      }
+    }
+    blockletMap.clear(toBeCleanedSegments);
+
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index e8ec6ad..0b5141e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -41,9 +41,9 @@ class C2DataMapFactory() extends DataMapFactory {
 
   override def fireEvent(event: ChangeEvent[_]): Unit = ???
 
-  override def clear(segmentId: String): Unit = ???
+  override def clear(segmentId: String): Unit = {}
 
-  override def clear(): Unit = ???
+  override def clear(): Unit = {}
 
   override def getDataMap(distributable: DataMapDistributable): DataMap = ???
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 4814183..db289d9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -45,408 +45,408 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
 
-//  test("test update operation with 0 rows updation.") {
-//    sql("""drop table if exists iud.zerorows""").show
-//    sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
-//    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
-//    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show()
-//    checkAnswer(
-//      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
-//      Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
-//    )
-//    sql("""drop table iud.zerorows""").show
-//
-//
-//  }
-//
-//
-//  test("update carbon table[select from source table with where and exist]") {
-//    sql("""drop table if exists iud.dest11""").show
-//    sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11""")
-//    sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
-//    checkAnswer(
-//      sql("""select c3,c5 from iud.dest11"""),
-//      Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music"))
-//    )
-//    sql("""drop table iud.dest11""").show
-//  }
-//
-//  test("update carbon table[using destination table columns with where and exist]") {
-//    sql("""drop table if exists iud.dest22""")
-//    sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""")
-//    checkAnswer(
-//      sql("""select c2 from iud.dest22 where c1='a'"""),
-//      Seq(Row(1))
-//    )
-//    sql("""update dest22 d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c2 from iud.dest22 where c1='a'"""),
-//      Seq(Row(2))
-//    )
-//    sql("""drop table if exists iud.dest22""")
-//  }
-//
-//  test("update carbon table without alias in set columns") {
-//    sql("""drop table if exists iud.dest33""")
-//    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
-//    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
-//      Seq(Row("MGM","Disco"))
-//    )
-//    sql("""drop table if exists iud.dest33""")
-//  }
-//
-//  test("update carbon table without alias in set columns with mulitple loads") {
-//    sql("""drop table if exists iud.dest33""")
-//    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
-//    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
-//      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
-//    )
-//    sql("""drop table if exists iud.dest33""")
-//  }
-//
-//  test("update carbon table with optimized parallelism for segment") {
-//    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
-//    sql(
-//      """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 string,c5 string)
-//        | STORED BY 'org.apache.carbondata.format'""".stripMargin)
-//    sql(
-//      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
-//         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
-//    sql(
-//      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
-//         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
-//    CarbonProperties.getInstance().addProperty(
-//      CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3")
-//    sql(
-//      """update iud.dest_opt_segment_parallelism d
-//        | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11)
-//        | where d.c1 = 'a'""".stripMargin).show()
-//    checkAnswer(
-//      sql("""select c3,c5 from iud.dest_opt_segment_parallelism where c1='a'"""),
-//      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
-//    )
-//    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
-//  }
-//
-//  test("update carbon table without alias in set three columns") {
-//    sql("""drop table if exists iud.dest44""")
-//    sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest44""")
-//    sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""),
-//      Seq(Row("a","MGM","Disco"))
-//    )
-//    sql("""drop table if exists iud.dest44""")
-//  }
-//
-//  test("update carbon table[single column select from source with where and exist]") {
-//    sql("""drop table if exists iud.dest55""")
-//    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
-//    sql("""update iud.dest55 d set (c3)  = (select s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
-//    checkAnswer(
-//      sql("""select c1,c3 from iud.dest55 """),
-//      Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
-//    )
-//    sql("""drop table if exists iud.dest55""")
-//  }
-//
-//  test("update carbon table[single column SELECT from source with where and exist]") {
-//    sql("""drop table if exists iud.dest55""")
-//    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
-//    sql("""update iud.dest55 d set (c3)  = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
-//    checkAnswer(
-//      sql("""select c1,c3 from iud.dest55 """),
-//      Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
-//    )
-//    sql("""drop table if exists iud.dest55""")
-//  }
-//
-//  test("update carbon table[using destination table columns without where clause]") {
-//    sql("""drop table if exists iud.dest66""")
-//    sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest66""")
-//    sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest66 """),
-//      Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez"))
-//    )
-//    sql("""drop table if exists iud.dest66""")
-//  }
-//
-//  test("update carbon table[using destination table columns with where clause]") {
-//    sql("""drop table if exists iud.dest77""")
-//    sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest77""")
-//    sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""),
-//      Seq(Row(5,"dddz"))
-//    )
-//    sql("""drop table if exists iud.dest77""")
-//  }
-//
-//  test("update carbon table[using destination table( no alias) columns without where clause]") {
-//    sql("""drop table if exists iud.dest88""")
-//    sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest88""")
-//    sql("""update iud.dest88  set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest88 """),
-//      Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey"))
-//    )
-//    sql("""drop table if exists iud.dest88""")
-//  }
-//
-//  test("update carbon table[using destination table columns with hard coded value ]") {
-//    sql("""drop table if exists iud.dest99""")
-//    sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest99""")
-//    sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest99 """),
-//      Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx"))
-//    )
-//    sql("""drop table if exists iud.dest99""")
-//  }
-//
-//  test("update carbon tableusing destination table columns with hard coded value and where condition]") {
-//    sql("""drop table if exists iud.dest110""")
-//    sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest110""")
-//    sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest110 where c1 = 'e' """),
-//      Seq(Row(6,"xyx"))
-//    )
-//    sql("""drop table iud.dest110""")
-//  }
-//
-//  test("update carbon table[using source  table columns with where and exist and no destination table condition]") {
-//    sql("""drop table if exists iud.dest120""")
-//    sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest120""")
-//    sql("""update iud.dest120 d  set (c3, c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11)""").show()
-//    checkAnswer(
-//      sql("""select c3,c5 from iud.dest120 """),
-//      Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee"))
-//    )
-//    sql("""drop table iud.dest120""")
-//  }
-//
-//  test("update carbon table[using destination table where and exist]") {
-//    sql("""drop table if exists iud.dest130""")
-//    sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest130""")
-//    sql("""update iud.dest130 dd  set (c2, c5 ) = (c2 + 1, "xyx")  where dd.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest130 where c1 = 'a' """),
-//      Seq(Row(2,"xyx"))
-//    )
-//    sql("""drop table iud.dest130""")
-//  }
-//
-//  test("update carbon table[using destination table (concat) where and exist]") {
-//    sql("""drop table if exists iud.dest140""")
-//    sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest140""")
-//    sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))  where d.c1 = 'a'""").show()
-//    checkAnswer(
-//      sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""),
-//      Seq(Row(2,"aaaz"))
-//    )
-//    sql("""drop table iud.dest140""")
-//  }
-//
-//  test("update carbon table[using destination table (concat) with  where") {
-//    sql("""drop table if exists iud.dest150""")
-//    sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest150""")
-//    sql("""update iud.dest150 d set (c5) = (concat(c5 , "z"))  where d.c1 = 'b'""").show()
-//    checkAnswer(
-//      sql("""select c5 from iud.dest150 where c1 = 'b' """),
-//      Seq(Row("bbbz"))
-//    )
-//    sql("""drop table iud.dest150""")
-//  }
-//
-//  test("update table with data for datatype mismatch with column ") {
-//    sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""")
-//    checkAnswer(
-//      sql("""select * from iud.update_01 where imei = 'skt'"""),
-//      Seq()
-//    )
-//  }
-//
-//  test("update carbon table-error[more columns in source table not allowed") {
-//    val exception = intercept[Exception] {
-//      sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show()
-//    }
-//    assertResult("Number of source and destination columns are not matching")(exception.getMessage)
-//  }
-//
-//  test("update carbon table-error[no set columns") {
-//    intercept[Exception] {
-//      sql("""update iud.dest d set () = ()""").show()
-//    }
-//  }
-//
-//  test("update carbon table-error[no set columns with updated column") {
-//    intercept[Exception] {
-//      sql("""update iud.dest d set  = (c1+1)""").show()
-//    }
-//  }
-//  test("update carbon table-error[one set column with two updated column") {
-//    intercept[Exception] {
-//      sql("""update iud.dest  set c2 = (c2 + 1, concat(c5 , "z") )""").show()
-//    }
-//  }
-//
-//  test("""update carbon [special characters  in value- test parsing logic ]""") {
-//    sql("""drop table if exists iud.dest160""")
-//    sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest160""")
-//    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
-//    sql("""update iud.dest160 set(c1) =  ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show()
-//    sql("""update iud.dest160 set(c1) =("\\")""").show()
-//    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
-//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""update iud.dest160 d set (c3,c5)      =     (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    sql("""drop table iud.dest160""")
-//  }
-//
-//  test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") {
-//    sql("""drop table if exists iud.dest170""")
-//    sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest170""")
-//    sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
-//    checkAnswer(
-//      sql("""select c3 from  iud.dest170 as d where d.c2 between 1 and 3"""),
-//      Seq(Row("MGM"), Row("RGK"), Row("cc"))
-//    )
-//    sql("""drop table iud.dest170""")
-//  }
-//
-//  test("""update carbon [self join select query ]""") {
-//    sql("""drop table if exists iud.dest171""")
-//    sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest171""")
-//    sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show
-//    sql("""drop table if exists iud.dest172""")
-//    sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest172""")
-//    sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show
-//    checkAnswer(
-//      sql("""select c3 from  iud.dest171"""),
-//      sql("""select c3 from  iud.dest172""")
-//    )
-//    sql("""drop table iud.dest171""")
-//    sql("""drop table iud.dest172""")
-//  }
-//
-//  test("update carbon table-error[closing bracket missed") {
-//    intercept[Exception] {
-//      sql("""update iud.dest d set (c2) = (194""").show()
-//    }
-//  }
-//
-//  test("update carbon table-error[starting bracket missed") {
-//    intercept[Exception] {
-//      sql("""update iud.dest d set (c2) = 194)""").show()
-//    }
-//  }
-//
-//  test("update carbon table-error[missing starting and closing bracket") {
-//    intercept[Exception] {
-//      sql("""update iud.dest d set (c2) = 194""").show()
-//    }
-//  }
-//
-//  test("test create table with column name as tupleID"){
-//    intercept[Exception] {
-//      sql("CREATE table carbontable (empno int, tupleID String, " +
-//          "designation String, doj Timestamp, workgroupcategory int, " +
-//          "workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
-//          "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
-//          "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " +
-//          "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," +
-//          "'DICTIONARY_EXCLUDE'='empname')")
-//    }
-//  }
-//
-//  test("test show segment after updating data : JIRA-1411,JIRA-1414") {
-//    sql("""drop table if exists iud.show_segment""").show
-//    sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
-//    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""")
-//    val before_update = sql("""show segments for table iud.show_segment""").toDF()
-//    sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
-//    val after_update = sql("""show segments for table iud.show_segment""").toDF()
-//    checkAnswer(
-//      before_update,
-//      after_update
-//    )
-//    sql("""drop table if exists iud.show_segment""").show
-//  }
-//
-//  test("Failure of update operation due to bad record with proper error message") {
-//    try {
-//      CarbonProperties.getInstance()
-//        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
-//      val errorMessage = intercept[Exception] {
-//        sql("drop table if exists update_with_bad_record")
-//        sql("create table update_with_bad_record(item int, name String) stored by 'carbondata'")
-//        sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' into table " +
-//            s"update_with_bad_record")
-//        sql("update update_with_bad_record set (item)=(3.45)").show()
-//        sql("drop table if exists update_with_bad_record")
-//      }
-//      assert(errorMessage.getMessage.contains("Data load failed due to bad record"))
-//    } finally {
-//      CarbonProperties.getInstance()
-//        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
-//    }
-//  }
-//
-//  test("More records after update operation ") {
-//    sql("DROP TABLE IF EXISTS default.carbon1")
-//    import sqlContext.implicits._
-//    val df = sqlContext.sparkContext.parallelize(1 to 36000)
-//      .map(x => (x+"a", "b", x))
-//      .toDF("c1", "c2", "c3")
-//    df.write
-//      .format("carbondata")
-//      .option("tableName", "carbon1")
-//      .option("tempCSV", "true")
-//      .option("compress", "true")
-//      .mode(SaveMode.Overwrite)
-//      .save()
-//
-//    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
-//
-//    sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show()
-//
-//    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
-//
-//    checkAnswer(sql("select * from default.carbon1 where c1 = 'test123'"), Row("test123","b",9999))
-//
-//    sql("DROP TABLE IF EXISTS default.carbon1")
-//  }
+  test("test update operation with 0 rows updation.") {
+    sql("""drop table if exists iud.zerorows""").show
+    sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
+    sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'xxx'""").show()
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+      Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+    )
+    sql("""drop table iud.zerorows""").show
+
+
+  }
+
+
+  test("update carbon table[select from source table with where and exist]") {
+    sql("""drop table if exists iud.dest11""").show
+    sql("""create table iud.dest11 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest11""")
+    sql("""update iud.dest11 d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest11"""),
+      Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), Row("MGM","Disco"),Row("RGK","Music"))
+    )
+    sql("""drop table iud.dest11""").show
+  }
+
+  test("update carbon table[using destination table columns with where and exist]") {
+    sql("""drop table if exists iud.dest22""")
+    sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""")
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(1))
+    )
+    sql("""update dest22 d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(2))
+    )
+    sql("""drop table if exists iud.dest22""")
+  }
+
+  test("update carbon table without alias in set columns") {
+    sql("""drop table if exists iud.dest33""")
+    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
+    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+      Seq(Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest33""")
+  }
+
+  test("update carbon table without alias in set columns with mulitple loads") {
+    sql("""drop table if exists iud.dest33""")
+    sql("""create table iud.dest33 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest33""")
+    sql("""update iud.dest33 d set (c3,c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest33 where c1='a'"""),
+      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest33""")
+  }
+
+  test("update carbon table with optimized parallelism for segment") {
+    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
+    sql(
+      """create table iud.dest_opt_segment_parallelism (c1 string,c2 int,c3 string,c5 string)
+        | STORED BY 'org.apache.carbondata.format'""".stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
+         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv'
+         | INTO table iud.dest_opt_segment_parallelism""".stripMargin)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "3")
+    sql(
+      """update iud.dest_opt_segment_parallelism d
+        | set (c3,c5 ) = (select s.c33 ,s.c55 from iud.source2 s where d.c1 = s.c11)
+        | where d.c1 = 'a'""".stripMargin).show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest_opt_segment_parallelism where c1='a'"""),
+      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest_opt_segment_parallelism""")
+  }
+
+  test("update carbon table without alias in set three columns") {
+    sql("""drop table if exists iud.dest44""")
+    sql("""create table iud.dest44 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest44""")
+    sql("""update iud.dest44 d set (c1,c3,c5 ) = (select s.c11, s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c1,c3,c5 from iud.dest44 where c1='a'"""),
+      Seq(Row("a","MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest44""")
+  }
+
+  test("update carbon table[single column select from source with where and exist]") {
+    sql("""drop table if exists iud.dest55""")
+    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
+    sql("""update iud.dest55 d set (c3)  = (select s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+    checkAnswer(
+      sql("""select c1,c3 from iud.dest55 """),
+      Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+    )
+    sql("""drop table if exists iud.dest55""")
+  }
+
+  test("update carbon table[single column SELECT from source with where and exist]") {
+    sql("""drop table if exists iud.dest55""")
+    sql("""create table iud.dest55 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest55""")
+    sql("""update iud.dest55 d set (c3)  = (SELECT s.c33 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+    checkAnswer(
+      sql("""select c1,c3 from iud.dest55 """),
+      Seq(Row("a","MGM"),Row("b","RGK"),Row("c","cc"),Row("d","dd"),Row("e","ee"))
+    )
+    sql("""drop table if exists iud.dest55""")
+  }
+
+  test("update carbon table[using destination table columns without where clause]") {
+    sql("""drop table if exists iud.dest66""")
+    sql("""create table iud.dest66 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest66""")
+    sql("""update iud.dest66 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest66 """),
+      Seq(Row(2,"aaaz"),Row(3,"bbbz"),Row(4,"cccz"),Row(5,"dddz"),Row(6,"eeez"))
+    )
+    sql("""drop table if exists iud.dest66""")
+  }
+
+  test("update carbon table[using destination table columns with where clause]") {
+    sql("""drop table if exists iud.dest77""")
+    sql("""create table iud.dest77 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest77""")
+    sql("""update iud.dest77 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z")) where d.c3 = 'dd'""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest77 where c3 = 'dd'"""),
+      Seq(Row(5,"dddz"))
+    )
+    sql("""drop table if exists iud.dest77""")
+  }
+
+  test("update carbon table[using destination table( no alias) columns without where clause]") {
+    sql("""drop table if exists iud.dest88""")
+    sql("""create table iud.dest88 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest88""")
+    sql("""update iud.dest88  set (c2, c5 ) = (c2 + 1, concat(c5 , "y" ))""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest88 """),
+      Seq(Row(2,"aaay"),Row(3,"bbby"),Row(4,"cccy"),Row(5,"dddy"),Row(6,"eeey"))
+    )
+    sql("""drop table if exists iud.dest88""")
+  }
+
+  test("update carbon table[using destination table columns with hard coded value ]") {
+    sql("""drop table if exists iud.dest99""")
+    sql("""create table iud.dest99 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest99""")
+    sql("""update iud.dest99 d set (c2, c5 ) = (c2 + 1, "xyx")""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest99 """),
+      Seq(Row(2,"xyx"),Row(3,"xyx"),Row(4,"xyx"),Row(5,"xyx"),Row(6,"xyx"))
+    )
+    sql("""drop table if exists iud.dest99""")
+  }
+
+  test("update carbon tableusing destination table columns with hard coded value and where condition]") {
+    sql("""drop table if exists iud.dest110""")
+    sql("""create table iud.dest110 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest110""")
+    sql("""update iud.dest110 d set (c2, c5 ) = (c2 + 1, "xyx") where d.c1 = 'e'""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest110 where c1 = 'e' """),
+      Seq(Row(6,"xyx"))
+    )
+    sql("""drop table iud.dest110""")
+  }
+
+  test("update carbon table[using source  table columns with where and exist and no destination table condition]") {
+    sql("""drop table if exists iud.dest120""")
+    sql("""create table iud.dest120 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest120""")
+    sql("""update iud.dest120 d  set (c3, c5 ) = (select s.c33 ,s.c55  from iud.source2 s where d.c1 = s.c11)""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest120 """),
+      Seq(Row("MGM","Disco"),Row("RGK","Music"),Row("cc","ccc"),Row("dd","ddd"),Row("ee","eee"))
+    )
+    sql("""drop table iud.dest120""")
+  }
+
+  test("update carbon table[using destination table where and exist]") {
+    sql("""drop table if exists iud.dest130""")
+    sql("""create table iud.dest130 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest130""")
+    sql("""update iud.dest130 dd  set (c2, c5 ) = (c2 + 1, "xyx")  where dd.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest130 where c1 = 'a' """),
+      Seq(Row(2,"xyx"))
+    )
+    sql("""drop table iud.dest130""")
+  }
+
+  test("update carbon table[using destination table (concat) where and exist]") {
+    sql("""drop table if exists iud.dest140""")
+    sql("""create table iud.dest140 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest140""")
+    sql("""update iud.dest140 d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"))  where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c2,c5 from iud.dest140 where c1 = 'a'"""),
+      Seq(Row(2,"aaaz"))
+    )
+    sql("""drop table iud.dest140""")
+  }
+
+  test("update carbon table[using destination table (concat) with  where") {
+    sql("""drop table if exists iud.dest150""")
+    sql("""create table iud.dest150 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest150""")
+    sql("""update iud.dest150 d set (c5) = (concat(c5 , "z"))  where d.c1 = 'b'""").show()
+    checkAnswer(
+      sql("""select c5 from iud.dest150 where c1 = 'b' """),
+      Seq(Row("bbbz"))
+    )
+    sql("""drop table iud.dest150""")
+  }
+
+  test("update table with data for datatype mismatch with column ") {
+    sql("""update iud.update_01 set (imei) = ('skt') where level = 'aaa'""")
+    checkAnswer(
+      sql("""select * from iud.update_01 where imei = 'skt'"""),
+      Seq()
+    )
+  }
+
+  test("update carbon table-error[more columns in source table not allowed") {
+    val exception = intercept[Exception] {
+      sql("""update iud.dest d set (c2, c5 ) = (c2 + 1, concat(c5 , "z"), "abc")""").show()
+    }
+    assertResult("Number of source and destination columns are not matching")(exception.getMessage)
+  }
+
+  test("update carbon table-error[no set columns") {
+    intercept[Exception] {
+      sql("""update iud.dest d set () = ()""").show()
+    }
+  }
+
+  test("update carbon table-error[no set columns with updated column") {
+    intercept[Exception] {
+      sql("""update iud.dest d set  = (c1+1)""").show()
+    }
+  }
+  test("update carbon table-error[one set column with two updated column") {
+    intercept[Exception] {
+      sql("""update iud.dest  set c2 = (c2 + 1, concat(c5 , "z") )""").show()
+    }
+  }
+
+  test("""update carbon [special characters  in value- test parsing logic ]""") {
+    sql("""drop table if exists iud.dest160""")
+    sql("""create table iud.dest160 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest160""")
+    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+    sql("""update iud.dest160 set(c1) =  ('abd$asjdh$adasj$l;sdf$*)$*)(&^')""").show()
+    sql("""update iud.dest160 set(c1) =("\\")""").show()
+    sql("""update iud.dest160 set(c1) = ("ab\')$*)(&^)")""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'a\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""update iud.dest160 d set (c3,c5)      =     (select s.c33,'a\\a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""update iud.dest160 d set (c3,c5) =(select s.c33,'a\'a\\' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""update iud.dest160 d set (c3,c5)=(select s.c33,'\\a\'a\"' from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    sql("""drop table iud.dest160""")
+  }
+
+  test("""update carbon [sub query, between and existing in outer condition.(Customer query ) ]""") {
+    sql("""drop table if exists iud.dest170""")
+    sql("""create table iud.dest170 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest170""")
+    sql("""update iud.dest170 d set (c3)=(select s.c33 from iud.source2 s where d.c1 = s.c11 and d.c2 = s.c22) where  d.c2 between 1 and 3""").show()
+    checkAnswer(
+      sql("""select c3 from  iud.dest170 as d where d.c2 between 1 and 3"""),
+      Seq(Row("MGM"), Row("RGK"), Row("cc"))
+    )
+    sql("""drop table iud.dest170""")
+  }
+
+  test("""update carbon [self join select query ]""") {
+    sql("""drop table if exists iud.dest171""")
+    sql("""create table iud.dest171 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest171""")
+    sql("""update iud.dest171 d set (c3)=(select concat(s.c3 , "z") from iud.dest171 s where d.c2 = s.c2)""").show
+    sql("""drop table if exists iud.dest172""")
+    sql("""create table iud.dest172 (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest172""")
+    sql("""update iud.dest172 d set (c3)=( concat(c3 , "z"))""").show
+    checkAnswer(
+      sql("""select c3 from  iud.dest171"""),
+      sql("""select c3 from  iud.dest172""")
+    )
+    sql("""drop table iud.dest171""")
+    sql("""drop table iud.dest172""")
+  }
+
+  test("update carbon table-error[closing bracket missed") {
+    intercept[Exception] {
+      sql("""update iud.dest d set (c2) = (194""").show()
+    }
+  }
+
+  test("update carbon table-error[starting bracket missed") {
+    intercept[Exception] {
+      sql("""update iud.dest d set (c2) = 194)""").show()
+    }
+  }
+
+  test("update carbon table-error[missing starting and closing bracket") {
+    intercept[Exception] {
+      sql("""update iud.dest d set (c2) = 194""").show()
+    }
+  }
+
+  test("test create table with column name as tupleID"){
+    intercept[Exception] {
+      sql("CREATE table carbontable (empno int, tupleID String, " +
+          "designation String, doj Timestamp, workgroupcategory int, " +
+          "workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
+          "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
+          "utilization int,salary int) STORED BY 'org.apache.carbondata.format' " +
+          "TBLPROPERTIES('DICTIONARY_INCLUDE'='empno,workgroupcategory,deptno,projectcode'," +
+          "'DICTIONARY_EXCLUDE'='empname')")
+    }
+  }
+
+  test("test show segment after updating data : JIRA-1411,JIRA-1414") {
+    sql("""drop table if exists iud.show_segment""").show
+    sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""")
+    val before_update = sql("""show segments for table iud.show_segment""").toDF()
+    sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+    val after_update = sql("""show segments for table iud.show_segment""").toDF()
+    checkAnswer(
+      before_update,
+      after_update
+    )
+    sql("""drop table if exists iud.show_segment""").show
+  }
+
+  test("Failure of update operation due to bad record with proper error message") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
+      val errorMessage = intercept[Exception] {
+        sql("drop table if exists update_with_bad_record")
+        sql("create table update_with_bad_record(item int, name String) stored by 'carbondata'")
+        sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/bad_record.csv' into table " +
+            s"update_with_bad_record")
+        sql("update update_with_bad_record set (item)=(3.45)").show()
+        sql("drop table if exists update_with_bad_record")
+      }
+      assert(errorMessage.getMessage.contains("Data load failed due to bad record"))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
+    }
+  }
+
+  test("More records after update operation ") {
+    sql("DROP TABLE IF EXISTS default.carbon1")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 36000)
+      .map(x => (x+"a", "b", x))
+      .toDF("c1", "c2", "c3")
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+    sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show()
+
+    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+
+    checkAnswer(sql("select * from default.carbon1 where c1 = 'test123'"), Row("test123","b",9999))
+
+    sql("DROP TABLE IF EXISTS default.carbon1")
+  }
 
   test("""CARBONDATA-1445 carbon.update.persist.enable=false it will fail to update data""") {
     CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 7790f59..4d5d39a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -35,10 +35,10 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.locks.ZookeeperInit
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.reader.ThriftReader
@@ -383,6 +383,8 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
       // discard cached table info in cachedDataSourceTables
       sqlContext.catalog.refreshTable(tableIdentifier)
+      DataMapStoreManager.getInstance().
+        clearDataMap(AbsoluteTableIdentifier.from(storePath, dbName, tableName))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3d67a79..5335fe2 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -41,10 +41,11 @@ import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -438,6 +439,10 @@ object CarbonDataRDDFactory {
       threadArray.foreach {
         thread => thread.join()
       }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
@@ -479,6 +484,10 @@ object CarbonDataRDDFactory {
       for (thread <- threadArray) {
         thread.join()
       }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 4797884..75ad4ae 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -426,7 +426,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-      DataMapStoreManager.getInstance().clearDataMap(identifier, "blocklet")
+      DataMapStoreManager.getInstance().clearDataMap(identifier)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb771f5e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index eb80b82..76241a6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -83,7 +83,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
-    DataMapStoreManager.getInstance().clearDataMap(identifier, "blocklet")
+    DataMapStoreManager.getInstance().clearDataMap(identifier)
   }
 
   override def checkSchemasModifiedTimeAndReloadTables(storePath: String) {