You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/28 14:21:39 UTC

carbondata git commit: [CARBONDATA-2549] Bloom remove guava cache and use CarbonCache

Repository: carbondata
Updated Branches:
  refs/heads/master a0350e100 -> 047c502b2


[CARBONDATA-2549] Bloom remove guava cache and use CarbonCache

Currently, bloom cache is implemented using guava cache, carbon has its own lru cache interfaces and complete sysytem it controls the cache intstead of controlling feature wise. So replace guava cache with carbon lru cache.

This closes #2327


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/047c502b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/047c502b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/047c502b

Branch: refs/heads/master
Commit: 047c502b270ddca45c3b00c2d685899eec075a70
Parents: a0350e1
Author: ravipesala <ra...@gmail.com>
Authored: Mon May 21 10:08:38 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Jun 28 22:21:22 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/cache/CacheProvider.java    |  31 ++-
 .../apache/carbondata/core/cache/CacheType.java |   2 +-
 datamap/bloom/pom.xml                           |  14 --
 .../datamap/bloom/BloomCacheKeyValue.java       | 108 +++++++++
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  42 ++--
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  55 ++++-
 .../carbondata/datamap/bloom/BloomDMModel.java  |  73 -------
 .../datamap/bloom/BloomDataMapCache.java        | 219 +++++--------------
 .../datamap/bloom/BloomDataMapModel.java        |  45 ++++
 .../datamap/bloom/BloomDataMapWriter.java       |   6 +-
 .../hadoop/util/bloom/CarbonBloomFilter.java    |  63 +++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   5 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |   6 +-
 13 files changed, 372 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
index d29c087..0ee4f25 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.cache;
 
+import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -92,7 +93,7 @@ public class CacheProvider {
       synchronized (lock) {
         if (!dictionaryCacheAlreadyExists(cacheType)) {
           if (null == carbonLRUCache) {
-            createLRULevelCacheInstance(cacheType);
+            createLRULevelCacheInstance();
           }
           createDictionaryCacheForGivenType(cacheType);
         }
@@ -102,6 +103,31 @@ public class CacheProvider {
   }
 
   /**
+   * This method will check if a cache already exists for given cache type and store
+   * if it is not present in the map
+   */
+  public <K, V> Cache<K, V> createCache(CacheType cacheType, String cacheClassName)
+      throws Exception {
+    //check if lru cache is null, if null create one
+    //check if cache is null for given cache type, if null create one
+    if (!dictionaryCacheAlreadyExists(cacheType)) {
+      synchronized (lock) {
+        if (!dictionaryCacheAlreadyExists(cacheType)) {
+          if (null == carbonLRUCache) {
+            createLRULevelCacheInstance();
+          }
+          Class<?> clazz = Class.forName(cacheClassName);
+          Constructor<?> constructor = clazz.getConstructors()[0];
+          constructor.setAccessible(true);
+          Cache cacheObject = (Cache) constructor.newInstance(carbonLRUCache);
+          cacheTypeToCacheMap.put(cacheType, cacheObject);
+        }
+      }
+    }
+    return cacheTypeToCacheMap.get(cacheType);
+  }
+
+  /**
    * This method will create the cache for given cache type
    *
    * @param cacheType       type of cache
@@ -126,9 +152,8 @@ public class CacheProvider {
   /**
    * This method will create the lru cache instance based on the given type
    *
-   * @param cacheType
    */
-  private void createLRULevelCacheInstance(CacheType cacheType) {
+  private void createLRULevelCacheInstance() {
     boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false"));
     if (isDriver) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
index ab51ff2..9cc2320 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/CacheType.java
@@ -69,7 +69,7 @@ public class CacheType<K, V> {
   /**
    * @param cacheName
    */
-  private CacheType(String cacheName) {
+  public CacheType(String cacheName) {
     this.cacheName = cacheName;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/bloom/pom.xml b/datamap/bloom/pom.xml
index f13d477..f9ebae2 100644
--- a/datamap/bloom/pom.xml
+++ b/datamap/bloom/pom.xml
@@ -22,20 +22,6 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <!--ignore guava 11.0.2 introduced by hadoop-->
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <!--note: guava 14.0.1 is omitted during assembly.
-    The compile scope here is for building and running test-->
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>14.0.1</version>
     </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
new file mode 100644
index 0000000..29e94d8
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCacheKeyValue.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.cache.Cacheable;
+
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
+
+/**
+ * Key and values of bloom to keep in cache.
+ */
+public class BloomCacheKeyValue {
+
+  public static class CacheKey implements Serializable {
+
+    private static final long serialVersionUID = -1478238084352505372L;
+    private String shardPath;
+    private String indexColumn;
+
+    CacheKey(String shardPath, String indexColumn) {
+      this.shardPath = shardPath;
+      this.indexColumn = indexColumn;
+    }
+
+    public String getShardPath() {
+      return shardPath;
+    }
+
+    public String getIndexColumn() {
+      return indexColumn;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("CacheKey{");
+      sb.append("shardPath='").append(shardPath).append('\'');
+      sb.append(", indexColumn='").append(indexColumn).append('\'');
+      sb.append('}');
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof CacheKey)) return false;
+      CacheKey cacheKey = (CacheKey) o;
+      return Objects.equals(shardPath, cacheKey.shardPath)
+          && Objects.equals(indexColumn, cacheKey.indexColumn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(shardPath, indexColumn);
+    }
+  }
+
+  public static class CacheValue implements Cacheable {
+
+    private List<CarbonBloomFilter> bloomFilters;
+
+    private int size;
+
+    public CacheValue(List<CarbonBloomFilter> bloomFilters) {
+      this.bloomFilters = bloomFilters;
+      for (CarbonBloomFilter bloomFilter : bloomFilters) {
+        size += bloomFilter.getSize();
+      }
+    }
+
+    @Override
+    public long getFileTimeStamp() {
+      return 0;
+    }
+
+    @Override
+    public int getAccessCount() {
+      return 0;
+    }
+
+    @Override
+    public long getMemorySize() {
+      return size;
+    }
+
+    public List<CarbonBloomFilter> getBloomFilters() {
+      return bloomFilters;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index ed03256..ad18704 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -43,8 +44,8 @@ import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 
 /**
@@ -58,34 +59,24 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
   public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
   private Set<String> indexedColumn;
-  private List<BloomDMModel> bloomIndexList;
+  private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
   private String shardName;
-  private BloomDataMapCache bloomDataMapCache;
   private Path indexPath;
 
   @Override
   public void init(DataMapModel dataMapModel) throws IOException {
     this.indexPath = FileFactory.getPath(dataMapModel.getFilePath());
     this.shardName = indexPath.getName();
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
-    if (!fs.exists(indexPath)) {
-      throw new IOException(
-          String.format("Path %s for Bloom index dataMap does not exist", indexPath));
+    if (dataMapModel instanceof BloomDataMapModel) {
+      BloomDataMapModel model = (BloomDataMapModel) dataMapModel;
+      this.cache = model.getCache();
+      this.indexedColumn = model.getIndexedColumnNames();
     }
-    if (!fs.isDirectory(indexPath)) {
-      throw new IOException(
-          String.format("Path %s for Bloom index dataMap must be a directory", indexPath));
-    }
-    this.bloomDataMapCache = BloomDataMapCache.getInstance();
-  }
-
-  public void setIndexedColumn(Set<String> indexedColumn) {
-    this.indexedColumn = indexedColumn;
   }
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions) {
+      List<PartitionSpec> partitions) throws IOException {
     Set<Blocklet> hitBlocklets = new HashSet<>();
     if (filterExp == null) {
       // null is different from empty here. Empty means after pruning, no blocklet need to scan.
@@ -95,20 +86,21 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     List<BloomQueryModel> bloomQueryModels = getQueryValue(filterExp.getFilterExpression());
     for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
       LOGGER.debug("prune blocklet for query: " + bloomQueryModel);
-      BloomDataMapCache.CacheKey cacheKey = new BloomDataMapCache.CacheKey(
+      BloomCacheKeyValue.CacheKey cacheKey = new BloomCacheKeyValue.CacheKey(
           this.indexPath.toString(), bloomQueryModel.columnName);
-      List<BloomDMModel> bloomDMModels = this.bloomDataMapCache.getBloomDMModelByKey(cacheKey);
-      for (BloomDMModel bloomDMModel : bloomDMModels) {
-        boolean scanRequired = bloomDMModel.getBloomFilter().membershipTest(new Key(
+      BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
+      List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
+      for (CarbonBloomFilter bloomFilter : bloomIndexList) {
+        boolean scanRequired = bloomFilter.membershipTest(new Key(
             convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)));
         if (scanRequired) {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
-              String.valueOf(bloomDMModel.getBlockletNo())));
-          Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo()));
+              String.valueOf(bloomFilter.getBlockletNo())));
+          Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomFilter.getBlockletNo()));
           hitBlocklets.add(blocklet);
         } else {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
-              String.valueOf(bloomDMModel.getBlockletNo())));
+              String.valueOf(bloomFilter.getBlockletNo())));
         }
       }
     }
@@ -173,8 +165,6 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
 
   @Override
   public void clear() {
-    bloomIndexList.clear();
-    bloomIndexList = null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index cda49b3..b174b2a 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -19,14 +19,20 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
@@ -36,7 +42,6 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -93,6 +98,9 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   private int bloomFilterSize;
   private double bloomFilterFpp;
   private boolean bloomCompress;
+  private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
+  // segmentId -> list of index file
+  private Map<String, Set<String>> segmentMap = new HashMap<>();
 
   public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws MalformedDataMapCommandException {
@@ -112,6 +120,13 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     this.dataMapMeta = new DataMapMeta(this.dataMapName, indexedColumns, optimizedOperations);
     LOGGER.info(String.format("DataMap %s works for %s with bloom size %d",
         this.dataMapName, this.dataMapMeta, this.bloomFilterSize));
+    try {
+      this.cache = CacheProvider.getInstance()
+          .createCache(new CacheType("bloom_cache"), BloomDataMapCache.class.getName());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new MalformedDataMapCommandException(e.getMessage());
+    }
   }
 
   /**
@@ -211,13 +226,21 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
     try {
-      String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
-          getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
-      CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
-      for (CarbonFile carbonFile : carbonFiles) {
+      Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
+      if (shardPaths == null) {
+        String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
+            getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
+        CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
+        shardPaths = new HashSet<>();
+        for (CarbonFile carbonFile : carbonFiles) {
+          shardPaths.add(carbonFile.getAbsolutePath());
+        }
+        segmentMap.put(segment.getSegmentNo(), shardPaths);
+      }
+      for (String shard : shardPaths) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
-        bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath()));
-        bloomDM.setIndexedColumn(new HashSet<String>(dataMapMeta.getIndexedColumnNames()));
+        bloomDM.init(new BloomDataMapModel(shard, cache,
+            new HashSet<>(dataMapMeta.getIndexedColumnNames())));
         dataMaps.add(bloomDM);
       }
     } catch (Exception e) {
@@ -232,9 +255,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
     BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
     String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
-    bloomCoarseGrainDataMap.init(new DataMapModel(indexPath));
-    bloomCoarseGrainDataMap.setIndexedColumn(
-        new HashSet<String>(dataMapMeta.getIndexedColumnNames()));
+    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache,
+        new HashSet<>(dataMapMeta.getIndexedColumnNames())));
     coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
     return coarseGrainDataMaps;
   }
@@ -306,12 +328,21 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
 
   @Override
   public void clear(Segment segment) {
-
+    Set<String> shards = segmentMap.remove(segment.getSegmentNo());
+    if (shards != null) {
+      for (String shard : shards) {
+        for (CarbonColumn carbonColumn : dataMapMeta.getIndexedColumns()) {
+          cache.invalidate(new BloomCacheKeyValue.CacheKey(shard, carbonColumn.getColName()));
+        }
+      }
+    }
   }
 
   @Override
   public void clear() {
-
+    for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
+      clear(new Segment(segmentId, null, null));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
deleted file mode 100644
index 7317c70..0000000
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.datamap.bloom;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.bloom.CarbonBloomFilter;
-
-/**
- * This class holds a bloom filter for one blocklet
- */
-@InterfaceAudience.Internal
-public class BloomDMModel implements Writable {
-  private int blockletNo;
-  private CarbonBloomFilter bloomFilter;
-
-  public BloomDMModel() {
-  }
-
-  public BloomDMModel(int blockletNo, CarbonBloomFilter bloomFilter) {
-    this.blockletNo = blockletNo;
-    this.bloomFilter = bloomFilter;
-  }
-
-  public int getBlockletNo() {
-    return blockletNo;
-  }
-
-  public CarbonBloomFilter getBloomFilter() {
-    return bloomFilter;
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder("BloomDMModel{");
-    sb.append(", blockletNo=").append(blockletNo);
-    sb.append(", bloomFilter=").append(bloomFilter);
-    sb.append('}');
-    return sb.toString();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(blockletNo);
-    bloomFilter.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    blockletNo = in.readInt();
-    bloomFilter = new CarbonBloomFilter();
-    bloomFilter.readFields(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
index 3de77ad..c1e6251 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
@@ -17,139 +17,99 @@
 package org.apache.carbondata.datamap.bloom;
 
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CarbonLRUCache;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.CacheStats;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 
 /**
  * This class is used to add cache for bloomfilter datamap to accelerate query through it.
- * The cache is implemented using guava cache and is a singleton which will be shared by all the
- * bloomfilter datamaps.
+ * The cache is implemented using carbon lru cache.
  * As for the cache, the key is a bloomindex file for a shard and the value is the bloomfilters
  * for the blocklets in this shard.
- * The size of cache can be configurable through CarbonProperties and the cache will be expired if
- * no one access it in the past 2 hours.
  */
 @InterfaceAudience.Internal
-public class BloomDataMapCache implements Serializable {
-  private static final LogService LOGGER = LogServiceFactory.getLogService(
-      BloomDataMapCache.class.getName());
-  private static final long serialVersionUID = 20160822L;
-  private static final int DEFAULT_CACHE_EXPIRED_HOURS = 2;
-  private LoadingCache<CacheKey, List<BloomDMModel>> bloomDMCache = null;
+public class BloomDataMapCache
+    implements Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BloomDataMapCache.class.getName());
 
-  private BloomDataMapCache() {
-    RemovalListener<CacheKey, List<BloomDMModel>> listener =
-        new RemovalListener<CacheKey, List<BloomDMModel>>() {
-      @Override
-      public void onRemoval(RemovalNotification<CacheKey, List<BloomDMModel>> notification) {
-        LOGGER.info(
-            String.format("Remove bloom datamap entry %s from cache due to %s",
-                notification.getKey(), notification.getCause()));
-      }
-    };
-    CacheLoader<CacheKey, List<BloomDMModel>> cacheLoader =
-        new CacheLoader<CacheKey, List<BloomDMModel>>() {
-      @Override
-      public List<BloomDMModel> load(CacheKey key) throws Exception {
-        LOGGER.info(String.format("Load bloom datamap entry %s to cache", key));
-        return loadBloomDataMapModel(key);
-      }
-    };
+  /**
+   * CarbonLRU cache
+   */
+  private CarbonLRUCache lruCache;
 
-    int cacheSizeInBytes = validateAndGetCacheSize()
-        * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
-        * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    this.bloomDMCache = CacheBuilder.newBuilder()
-        .recordStats()
-        .maximumSize(cacheSizeInBytes)
-        .expireAfterAccess(DEFAULT_CACHE_EXPIRED_HOURS, TimeUnit.HOURS)
-        .removalListener(listener)
-        .build(cacheLoader);
+  public BloomDataMapCache(CarbonLRUCache lruCache) {
+    this.lruCache = lruCache;
   }
 
-  private static class SingletonHolder {
-    private static final BloomDataMapCache INSTANCE = new BloomDataMapCache();
+  @Override
+  public BloomCacheKeyValue.CacheValue get(BloomCacheKeyValue.CacheKey key)
+      throws IOException {
+    BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key);
+    if (cacheValue == null) {
+      cacheValue = loadBloomDataMapModel(key);
+      lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize());
+    }
+    return cacheValue;
   }
 
-  /**
-   * get instance
-   */
-  public static BloomDataMapCache getInstance() {
-    return SingletonHolder.INSTANCE;
+  @Override
+  public List<BloomCacheKeyValue.CacheValue> getAll(List<BloomCacheKeyValue.CacheKey> keys)
+      throws IOException {
+    List<BloomCacheKeyValue.CacheValue> cacheValues = new ArrayList<>();
+    for (BloomCacheKeyValue.CacheKey key : keys) {
+      BloomCacheKeyValue.CacheValue cacheValue = get(key);
+      cacheValues.add(cacheValue);
+    }
+    return cacheValues;
   }
 
-  /**
-   * for resolve from serialized
-   */
-  protected Object readResolve() {
-    return getInstance();
+  @Override
+  public BloomCacheKeyValue.CacheValue getIfPresent(BloomCacheKeyValue.CacheKey key) {
+    return (BloomCacheKeyValue.CacheValue) lruCache.get(key.toString());
   }
 
-  private int validateAndGetCacheSize() {
-    String cacheSizeStr = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE,
-        CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL);
-    int cacheSize;
-    try {
-      cacheSize = Integer.parseInt(cacheSizeStr);
-      if (cacheSize <= 0) {
-        throw new NumberFormatException("Value should be greater than 0: " + cacheSize);
-      }
-    } catch (NumberFormatException ex) {
-      LOGGER.error(String.format(
-          "The value '%s' for '%s' is invalid, it must be an Integer that greater than 0."
-              + " Use default value '%s' instead.", cacheSizeStr,
-          CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE,
-          CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL));
-      cacheSize = Integer.parseInt(
-          CarbonCommonConstants.CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL);
-    }
-    return cacheSize;
+  @Override
+  public void invalidate(BloomCacheKeyValue.CacheKey key) {
+    lruCache.remove(key.toString());
+  }
+
+  @Override
+  public void put(BloomCacheKeyValue.CacheKey key, BloomCacheKeyValue.CacheValue value)
+      throws IOException, MemoryException {
+    // No impl required.
   }
 
   /**
    * load datamap from bloomindex file
    */
-  private List<BloomDMModel> loadBloomDataMapModel(CacheKey cacheKey) {
+  private BloomCacheKeyValue.CacheValue loadBloomDataMapModel(
+      BloomCacheKeyValue.CacheKey cacheKey) {
     DataInputStream dataInStream = null;
-    List<BloomDMModel> bloomDMModels = new ArrayList<BloomDMModel>();
+    List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
     try {
       String indexFile = getIndexFileFromCacheKey(cacheKey);
       dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
-      try {
-        while (dataInStream.available() > 0) {
-          BloomDMModel model = new BloomDMModel();
-          model.readFields(dataInStream);
-          bloomDMModels.add(model);
-        }
-      } catch (EOFException e) {
-        LOGGER.info(String.format("Read %d bloom indices from %s",
-            bloomDMModels.size(), indexFile));
+      while (dataInStream.available() > 0) {
+        CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
+        bloomFilter.readFields(dataInStream);
+        bloomFilters.add(bloomFilter);
       }
-      this.bloomDMCache.put(cacheKey, bloomDMModels);
-      return bloomDMModels;
+      LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile));
+
+      return new BloomCacheKeyValue.CacheValue(bloomFilters);
     } catch (IOException e) {
-      clear(cacheKey);
       LOGGER.error(e, "Error occurs while reading bloom index");
       throw new RuntimeException("Error occurs while reading bloom index", e);
     } finally {
@@ -160,71 +120,12 @@ public class BloomDataMapCache implements Serializable {
   /**
    * get bloom index file name from cachekey
    */
-  private String getIndexFileFromCacheKey(CacheKey cacheKey) {
-    return BloomCoarseGrainDataMap.getBloomIndexFile(cacheKey.shardPath, cacheKey.indexColumn);
-  }
-
-  /**
-   * get bloom datamap from cache
-   */
-  public List<BloomDMModel> getBloomDMModelByKey(CacheKey cacheKey) {
-    return this.bloomDMCache.getUnchecked(cacheKey);
-  }
-
-  /**
-   * get cache status
-   */
-  private String getCacheStatus() {
-    StringBuilder sb = new StringBuilder();
-    CacheStats stats = this.bloomDMCache.stats();
-    sb.append("hitCount: ").append(stats.hitCount()).append(System.lineSeparator())
-        .append("hitRate: ").append(stats.hitCount()).append(System.lineSeparator())
-        .append("loadCount: ").append(stats.loadCount()).append(System.lineSeparator())
-        .append("averageLoadPenalty: ").append(stats.averageLoadPenalty())
-        .append(System.lineSeparator())
-        .append("evictionCount: ").append(stats.evictionCount());
-    return sb.toString();
-  }
-
-  /**
-   * clear this cache
-   */
-  private void clear(CacheKey cacheKey) {
-    LOGGER.info(String.format("Current meta cache statistic: %s", getCacheStatus()));
-    LOGGER.info("Trigger invalid cache for bloom datamap, key is " + cacheKey);
-    this.bloomDMCache.invalidate(cacheKey);
+  private String getIndexFileFromCacheKey(BloomCacheKeyValue.CacheKey cacheKey) {
+    return BloomCoarseGrainDataMap
+        .getBloomIndexFile(cacheKey.getShardPath(), cacheKey.getIndexColumn());
   }
 
-  public static class CacheKey {
-    private String shardPath;
-    private String indexColumn;
-
-    CacheKey(String shardPath, String indexColumn) {
-      this.shardPath = shardPath;
-      this.indexColumn = indexColumn;
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder sb = new StringBuilder("CacheKey{");
-      sb.append("shardPath='").append(shardPath).append('\'');
-      sb.append(", indexColumn='").append(indexColumn).append('\'');
-      sb.append('}');
-      return sb.toString();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (!(o instanceof CacheKey)) return false;
-      CacheKey cacheKey = (CacheKey) o;
-      return Objects.equals(shardPath, cacheKey.shardPath)
-          && Objects.equals(indexColumn, cacheKey.indexColumn);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(shardPath, indexColumn);
-    }
+  @Override
+  public void clearAccessCount(List<BloomCacheKeyValue.CacheKey> keys) {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
new file mode 100644
index 0000000..ddee6e5
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+
+public class BloomDataMapModel extends DataMapModel {
+
+  private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
+
+  private Set<String> indexedColumnNames;
+
+  public BloomDataMapModel(String filePath,
+      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache,
+      Set<String> indexedColumnNames) {
+    super(filePath);
+    this.cache = cache;
+    this.indexedColumnNames = indexedColumnNames;
+  }
+
+  public Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> getCache() {
+    return cache;
+  }
+
+  public Set<String> getIndexedColumnNames() {
+    return indexedColumnNames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index 2791a6c..1f960e2 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -177,11 +177,11 @@ public class BloomDataMapWriter extends DataMapWriter {
     List<CarbonColumn> indexColumns = getIndexColumns();
     try {
       for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
-        BloomDMModel model =
-            new BloomDMModel(this.currentBlockletId, indexBloomFilters.get(indexColId));
+        CarbonBloomFilter bloomFilter = indexBloomFilters.get(indexColId);
+        bloomFilter.setBlockletNo(currentBlockletId);
         // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface.
         // In lower version, we use default java serializer to write bloomfilter.
-        model.write(this.currentDataOutStreams.get(indexColId));
+        bloomFilter.write(this.currentDataOutStreams.get(indexColId));
         this.currentDataOutStreams.get(indexColId).flush();
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
index 7c39cad..c6a62cc 100644
--- a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
+++ b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.util.bloom;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.BitSet;
 
 import org.roaringbitmap.RoaringBitmap;
@@ -33,6 +34,8 @@ public class CarbonBloomFilter extends BloomFilter {
 
   private boolean compress;
 
+  private int blockletNo;
+
   public CarbonBloomFilter() {
   }
 
@@ -68,10 +71,12 @@ public class CarbonBloomFilter extends BloomFilter {
 
   @Override
   public void write(DataOutput out) throws IOException {
+    out.writeInt(blockletNo);
     out.writeInt(this.nbHash);
     out.writeByte(this.hashType);
     out.writeInt(this.vectorSize);
     out.writeBoolean(compress);
+    BitSet bits = getBitSet();
     if (!compress) {
       byte[] bytes = bits.toByteArray();
       out.writeInt(bytes.length);
@@ -90,6 +95,7 @@ public class CarbonBloomFilter extends BloomFilter {
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    this.blockletNo = in.readInt();
     this.nbHash = in.readInt();
     this.hashType = in.readByte();
     this.vectorSize = in.readInt();
@@ -98,11 +104,66 @@ public class CarbonBloomFilter extends BloomFilter {
       int len = in.readInt();
       byte[] bytes = new byte[len];
       in.readFully(bytes);
-      this.bits = BitSet.valueOf(bytes);
+      setBitSet(BitSet.valueOf(bytes));
     } else {
       this.bitmap = new RoaringBitmap();
       bitmap.deserialize(in);
     }
     this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
   }
+
+  public int getSize() {
+    int size = 14; // size of nbHash,hashType, vectorSize, compress
+    if (compress) {
+      size += bitmap.getSizeInBytes();
+    } else {
+      try {
+        size += getBitSet().toLongArray().length * 8;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return size;
+  }
+
+  /**
+   * Get bitset from super class using reflection, in some cases java cannot access the fields if
+   * jars are loaded in separte class loaders.
+   *
+   * @return
+   * @throws IOException
+   */
+  private BitSet getBitSet() throws IOException {
+    try {
+      Field field = BloomFilter.class.getDeclaredField("bits");
+      field.setAccessible(true);
+      return (BitSet)field.get(this);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Set bitset from super class using reflection, in some cases java cannot access the fields if
+   * jars are loaded in separte class loaders.
+   * @param bitSet
+   * @throws IOException
+   */
+  private void setBitSet(BitSet bitSet) throws IOException {
+    try {
+      Field field = BloomFilter.class.getDeclaredField("bits");
+      field.setAccessible(true);
+      field.set(this, bitSet);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void setBlockletNo(int blockletNo) {
+    this.blockletNo = blockletNo;
+  }
+
+  public int getBlockletNo() {
+    return blockletNo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 3e0a035..bd5260f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -219,9 +219,8 @@ object CarbonEnv {
     val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
       identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
       identifier.table)
-    if (table.isEmpty ||
-        (table.isDefined && carbonEnv.carbonMetastore
-          .checkSchemasModifiedTimeAndReloadTable(identifier))) {
+    if (carbonEnv.carbonMetastore
+          .checkSchemasModifiedTimeAndReloadTable(identifier)) {
       sparkSession.sessionState.catalog.refreshTable(identifier)
       val tablePath = CarbonProperties.getStorePath + File.separator + identifier.database
         .getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase) +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/047c502b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 81a6bed..900f54c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -565,12 +565,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val (timestampFile, timestampFileType) = getTimestampFileAndType()
     var isRefreshed = false
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
-      if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
-        getLastModifiedTime ==
+      val lastModifiedTime =
+        FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+      if (!(lastModifiedTime ==
             tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
         metadata.carbonTables = metadata.carbonTables.filterNot(
           table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
         table.getDatabaseName.equalsIgnoreCase(tableIdentifier.database.getOrElse("default")))
+        updateSchemasUpdatedTime(lastModifiedTime)
         isRefreshed = true
       }
     }