You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/18 08:18:45 UTC

[2/2] carbondata git commit: [CARBONDATA-2484][LUCENE]Refactor distributable code and lauch job to clear the datamap from executor(clears segmentMap and remove datamap from cache)

[CARBONDATA-2484][LUCENE]Refactor distributable code and lauch job to clear the datamap from executor(clears segmentMap and remove datamap from cache)

Problem:
During query, blockletDataMapFactory maintains a segmentMap which has mapping of
segmentId -> list of index file, and this will be used while getting the extended blocklet
by checking whether the blocklet present in the index or not.
In case of Lucene, the datamap job will be launched and during pruning the segmentMap will be added
in executor and this map will be cleared in driver when drop table is called, but it will not be cleared in executor.
so when the query is fired after table or datamap is dropped, the lucene query fails.

Solution:
So when drop table or drop datamap is called a job is launched which clears the datamaps from segmentMap and cache and
then clears in driver.

This PR also refactors the datamap job classes and other common classes

This closes #2310


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/20180485
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/20180485
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/20180485

Branch: refs/heads/master
Commit: 201804858bf41537389008b03e7bd964cc3760fa
Parents: 2f79e14
Author: akashrn5 <ak...@gmail.com>
Authored: Fri May 11 16:57:46 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri May 18 13:48:32 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/AbstractDataMapJob.java        |  42 +++++
 .../carbondata/core/datamap/DataMapChooser.java |  19 +++
 .../carbondata/core/datamap/DataMapJob.java     |  40 +++++
 .../core/datamap/DataMapStoreManager.java       |  65 +++++++-
 .../carbondata/core/datamap/DataMapUtil.java    | 157 ++++++++++++++++++
 .../datamap/DistributableDataMapFormat.java     | 166 +++++++++++++++++++
 .../carbondata/core/datamap/TableDataMap.java   |   9 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |   3 +-
 .../dev/expr/DataMapExprWrapperImpl.java        |   3 +-
 .../core/indexstore/BlockletDetailsFetcher.java |   5 +
 .../blockletindex/BlockletDataMapFactory.java   |   6 +-
 .../core/metadata/schema/table/CarbonTable.java |   5 +
 .../core/util/ObjectSerializationUtil.java      | 116 +++++++++++++
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   8 +
 .../lucene/LuceneDataMapFactoryBase.java        |   9 +
 .../hadoop/api/AbstractDataMapJob.java          |  42 -----
 .../hadoop/api/CarbonInputFormat.java           |  59 +------
 .../hadoop/api/CarbonTableOutputFormat.java     |   2 +-
 .../carbondata/hadoop/api/DataMapJob.java       |  40 -----
 .../hadoop/api/DistributableDataMapFormat.java  | 148 -----------------
 .../hadoop/testutil/StoreCreator.java           |   2 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |  22 +--
 .../hadoop/util/ObjectSerializationUtil.java    | 118 -------------
 .../test/util/ObjectSerializationUtilTest.java  |   2 +-
 .../hive/MapredCarbonInputFormat.java           |   2 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |   1 -
 .../testsuite/datamap/DataMapWriterSuite.scala  |   2 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |   4 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   4 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  14 +-
 .../datamap/CarbonDropDataMapCommand.scala      |   8 +-
 .../management/CarbonLoadDataCommand.scala      |   4 +-
 .../datasources/SparkCarbonFileFormat.scala     |   2 +-
 .../datasources/SparkCarbonTableFormat.scala    |   3 +-
 .../streaming/CarbonStreamOutputFormat.java     |   2 +-
 37 files changed, 687 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
new file mode 100644
index 0000000..bdbf9fc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/AbstractDataMapJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+  @Override public void execute(CarbonTable carbonTable,
+      FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
+  }
+
+  @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+      FilterResolverIntf resolverIntf) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 7cdabd6..4d1c718 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -131,6 +131,25 @@ public class DataMapChooser {
   }
 
   /**
+   * Get all datamaps of the table for clearing purpose
+   */
+  public DataMapExprWrapper getAllDataMapsForClear(CarbonTable carbonTable)
+      throws IOException {
+    List<TableDataMap> allDataMapFG =
+        DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
+    DataMapExprWrapper initialExpr = null;
+    if (allDataMapFG.size() > 0) {
+      initialExpr = new DataMapExprWrapperImpl(allDataMapFG.get(0), null);
+
+      for (int i = 1; i < allDataMapFG.size(); i++) {
+        initialExpr = new AndDataMapExprWrapper(initialExpr,
+            new DataMapExprWrapperImpl(allDataMapFG.get(i), null), null);
+      }
+    }
+    return initialExpr;
+  }
+
+  /**
    * Returns default blocklet datamap
    * @param carbonTable
    * @param resolverIntf

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
new file mode 100644
index 0000000..57a739d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the
+ * datamaps distributably and returns the final blocklet list
+ */
+public interface DataMapJob extends Serializable {
+
+  void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);
+
+  List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
+      FilterResolverIntf filter);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 072b86e..c739dc3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -57,6 +58,10 @@ public final class DataMapStoreManager {
 
   private static DataMapStoreManager instance = new DataMapStoreManager();
 
+  public Map<String, List<TableDataMap>> getAllDataMaps() {
+    return allDataMaps;
+  }
+
   /**
    * Contains the list of datamaps for each table.
    */
@@ -364,17 +369,58 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMaps(AbsoluteTableIdentifier identifier) {
+    CarbonTable carbonTable = getCarbonTable(identifier);
     String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
     List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
+    if (null != carbonTable && tableIndices != null) {
+      try {
+        DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
+      } catch (IOException e) {
+        LOGGER.error(e, "clear dataMap job failed");
+        // ignoring the exception
+      }
+    }
     segmentRefreshMap.remove(identifier.uniqueName());
+    clearDataMaps(tableUniqueName);
+    allDataMaps.remove(tableUniqueName);
+  }
+
+  /**
+   * This method returns the carbonTable from identifier
+   * @param identifier
+   * @return
+   */
+  public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
+    CarbonTable carbonTable = null;
+    carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(identifier.getDatabaseName(), identifier.getTableName());
+    if (carbonTable == null) {
+      try {
+        carbonTable = CarbonTable
+            .buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(),
+                identifier.getTablePath());
+      } catch (IOException e) {
+        LOGGER.error("failed to get carbon table from table Path");
+        // ignoring exception
+      }
+    }
+    return carbonTable;
+  }
+
+  /**
+   * this methos clears the datamap of table from memory
+   */
+  public void clearDataMaps(String tableUniqName) {
+    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqName);
     if (tableIndices != null) {
       for (TableDataMap tableDataMap : tableIndices) {
         if (tableDataMap != null) {
+          // clear the segmentMap in BlockletDetailsFetcher,else the Segment will remain in executor
+          // and the query fails as we will check whether the blocklet contains in the index or not
+          tableDataMap.getBlockletDetailsFetcher().clear();
           tableDataMap.clear();
-          break;
         }
       }
-      allDataMaps.remove(tableUniqueName);
     }
   }
 
@@ -384,14 +430,21 @@ public final class DataMapStoreManager {
    * @param identifier Table identifier
    */
   public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) {
-    List<TableDataMap> tableIndices =
-        allDataMaps.get(identifier.getCarbonTableIdentifier().getTableUniqueName());
+    CarbonTable carbonTable = getCarbonTable(identifier);
+    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
     if (tableIndices != null) {
       int i = 0;
       for (TableDataMap tableDataMap : tableIndices) {
-        if (tableDataMap != null && dataMapName
+        if (carbonTable != null && tableDataMap != null && dataMapName
             .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
-          tableDataMap.clear();
+          try {
+            DataMapUtil.executeDataMapJobForClearingDataMaps(carbonTable);
+            tableDataMap.clear();
+          } catch (IOException e) {
+            LOGGER.error(e, "clear dataMap job failed");
+            // ignoring the exception
+          }
           tableDataMap.deleteDatamapData();
           tableIndices.remove(i);
           break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
new file mode 100644
index 0000000..e3d3194
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DataMapUtil {
+
+  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataMapUtil.class.getName());
+
+  /**
+   * Creates instance for the DataMap Job class
+   *
+   * @param className
+   * @return
+   */
+  public static Object createDataMapJob(String className) {
+    try {
+      return Class.forName(className).getDeclaredConstructors()[0].newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e);
+      return null;
+    }
+  }
+
+  /**
+   * This method sets the datamapJob in the configuration
+   * @param configuration
+   * @param dataMapJob
+   * @throws IOException
+   */
+  public static void setDataMapJob(Configuration configuration, Object dataMapJob)
+      throws IOException {
+    if (dataMapJob != null) {
+      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+      configuration.set(DATA_MAP_DSTR, toString);
+    }
+  }
+
+  /**
+   * get datamap job from the configuration
+   * @param configuration job configuration
+   * @return DataMap Job
+   * @throws IOException
+   */
+  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+    String jobString = configuration.get(DATA_MAP_DSTR);
+    if (jobString != null) {
+      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+    }
+    return null;
+  }
+
+  /**
+   * This method gets the datamapJob and call execute , this job will be launched before clearing
+   * datamaps from driver side during drop table and drop datamap and clears the datamap in executor
+   * side
+   * @param carbonTable
+   * @throws IOException
+   */
+  public static void executeDataMapJobForClearingDataMaps(CarbonTable carbonTable)
+      throws IOException {
+    String dataMapJobClassName = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+    DataMapJob dataMapJob = (DataMapJob) createDataMapJob(dataMapJobClassName);
+    String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
+        getValidAndInvalidSegments(carbonTable);
+    List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
+    List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
+    DataMapExprWrapper dataMapExprWrapper = null;
+    if (DataMapStoreManager.getInstance().getAllDataMap(carbonTable).size() > 0) {
+      DataMapChooser dataMapChooser = new DataMapChooser(carbonTable);
+      dataMapExprWrapper = dataMapChooser.getAllDataMapsForClear(carbonTable);
+    } else {
+      return;
+    }
+    DistributableDataMapFormat dataMapFormat =
+        createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, null,
+            className, true);
+    dataMapJob.execute(dataMapFormat, null);
+  }
+
+  private static DistributableDataMapFormat createDataMapJob(CarbonTable carbonTable,
+      DataMapExprWrapper dataMapExprWrapper, List<Segment> validsegments,
+      List<Segment> invalidSegments, List<PartitionSpec> partitionsToPrune, String clsName,
+      boolean isJobToClearDataMaps) {
+    try {
+      Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0];
+      return (DistributableDataMapFormat) cons
+          .newInstance(carbonTable, dataMapExprWrapper, validsegments, invalidSegments,
+              partitionsToPrune, isJobToClearDataMaps);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * this method gets the datamapJob and call execute of that job, this will be launched for
+   * distributed CG or FG
+   * @return list of Extended blocklets after pruning
+   */
+  public static List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
+      FilterResolverIntf resolver, List<Segment> validSegments,
+      DataMapExprWrapper dataMapExprWrapper, DataMapJob dataMapJob,
+      List<PartitionSpec> partitionsToPrune) throws IOException {
+    String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
+        getValidAndInvalidSegments(carbonTable);
+    List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
+    DistributableDataMapFormat dataMapFormat =
+        createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
+            partitionsToPrune, className, false);
+    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(dataMapFormat, resolver);
+    // Apply expression on the blocklets.
+    prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+    return prunedBlocklets;
+  }
+
+  private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
+      CarbonTable carbonTable) throws IOException {
+    SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+    return ssm.getValidAndInvalidSegments();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
new file mode 100644
index 0000000..4200414
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Input format for datamaps, it makes the datamap pruning distributable.
+ */
+public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet> implements
+    Serializable {
+
+  private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
+
+  private CarbonTable table;
+
+  private DataMapExprWrapper dataMapExprWrapper;
+
+  private List<Segment> validSegments;
+
+  private List<Segment> invalidSegments;
+
+  private List<PartitionSpec> partitions;
+
+  private  DataMapDistributableWrapper distributable;
+
+  private boolean isJobToClearDataMaps = false;
+
+  DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper,
+      List<Segment> validSegments, List<Segment> invalidSegments, List<PartitionSpec> partitions,
+      boolean isJobToClearDataMaps) {
+    this.table = table;
+    this.dataMapExprWrapper = dataMapExprWrapper;
+    this.validSegments = validSegments;
+    this.invalidSegments = invalidSegments;
+    this.partitions = partitions;
+    this.isJobToClearDataMaps = isJobToClearDataMaps;
+  }
+
+  public boolean isJobToClearDataMaps() {
+    return isJobToClearDataMaps;
+  }
+
+  public static void setFilterExp(Configuration configuration, FilterResolverIntf filterExp)
+      throws IOException {
+    if (filterExp != null) {
+      String string = ObjectSerializationUtil.convertObjectToString(filterExp);
+      configuration.set(FILTER_EXP, string);
+    }
+  }
+
+  private static FilterResolverIntf getFilterExp(Configuration configuration) throws IOException {
+    String filterString = configuration.get(FILTER_EXP);
+    if (filterString != null) {
+      Object toObject = ObjectSerializationUtil.convertStringToObject(filterString);
+      return (FilterResolverIntf) toObject;
+    }
+    return null;
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    List<DataMapDistributableWrapper> distributables =
+        dataMapExprWrapper.toDistributable(validSegments);
+    List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
+    inputSplits.addAll(distributables);
+    return inputSplits;
+  }
+
+  @Override
+  public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    return new RecordReader<Void, ExtendedBlocklet>() {
+      private Iterator<ExtendedBlocklet> blockletIterator;
+      private ExtendedBlocklet currBlocklet;
+
+      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+        distributable = (DataMapDistributableWrapper) inputSplit;
+        // clear the segmentMap and from cache in executor when there are invalid segments
+        if (invalidSegments.size() > 0) {
+          DataMapStoreManager.getInstance().clearInvalidSegments(table, invalidSegments);
+        }
+        TableDataMap tableDataMap = DataMapStoreManager.getInstance()
+            .getDataMap(table, distributable.getDistributable().getDataMapSchema());
+        if (isJobToClearDataMaps) {
+          // if job is to clear datamaps just clear datamaps from cache and return
+          DataMapStoreManager.getInstance()
+              .clearDataMaps(table.getCarbonTableIdentifier().getTableUniqueName());
+          blockletIterator = Collections.emptyIterator();
+          return;
+        }
+        List<ExtendedBlocklet> blocklets = tableDataMap.prune(distributable.getDistributable(),
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
+        for (ExtendedBlocklet blocklet : blocklets) {
+          blocklet.setDataMapUniqueId(distributable.getUniqueId());
+        }
+        blockletIterator = blocklets.iterator();
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        boolean hasNext = blockletIterator.hasNext();
+        if (hasNext) {
+          currBlocklet = blockletIterator.next();
+        }
+        return hasNext;
+      }
+
+      @Override
+      public Void getCurrentKey() throws IOException, InterruptedException {
+        return null;
+      }
+
+      @Override
+      public ExtendedBlocklet getCurrentValue() throws IOException, InterruptedException {
+        return currBlocklet;
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return 0;
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 314b515..b8254d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -74,6 +74,10 @@ public final class TableDataMap extends OperationEventListener {
     this.segmentPropertiesFetcher = segmentPropertiesFetcher;
   }
 
+  public BlockletDetailsFetcher getBlockletDetailsFetcher() {
+    return blockletDetailsFetcher;
+  }
+
   /**
    * Pass the valid segments and prune the datamap using filter expression
    *
@@ -122,8 +126,9 @@ public final class TableDataMap extends OperationEventListener {
   public List<DataMapDistributable> toDistributable(List<Segment> segments) throws IOException {
     List<DataMapDistributable> distributables = new ArrayList<>();
     for (Segment segment : segments) {
-      List<DataMapDistributable> list = dataMapFactory.toDistributable(segment);
-      for (DataMapDistributable distributable: list) {
+      List<DataMapDistributable> list =
+          dataMapFactory.toDistributable(segment);
+      for (DataMapDistributable distributable : list) {
         distributable.setDataMapSchema(dataMapSchema);
         distributable.setSegment(segment);
         distributable.setTablePath(identifier.getTablePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 199f993..1de16bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -87,7 +87,8 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
     return null;
   }
 
-  @Override public List<DataMapDistributableWrapper> toDistributable(List<Segment> segments)
+  @Override
+  public List<DataMapDistributableWrapper> toDistributable(List<Segment> segments)
       throws IOException {
     List<DataMapDistributableWrapper> wrappers = new ArrayList<>();
     wrappers.addAll(left.toDistributable(segments));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index 0a3896c..38f2336 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -74,7 +74,8 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
     return null;
   }
 
-  @Override public List<DataMapDistributableWrapper> toDistributable(List<Segment> segments)
+  @Override
+  public List<DataMapDistributableWrapper> toDistributable(List<Segment> segments)
       throws IOException {
     List<DataMapDistributable> dataMapDistributables = dataMap.toDistributable(segments);
     List<DataMapDistributableWrapper> wrappers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 58c11db..1971f40 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -55,4 +55,9 @@ public interface BlockletDetailsFetcher {
    */
   List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
       throws IOException;
+
+  /**
+   * clears the datamap from cache and segmentMap from executor
+   */
+  void clear();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e56c2d0..021fb82 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -282,8 +282,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   @Override
   public void clear() {
-    for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
-      clear(new Segment(segmentId, null, null));
+    if (segmentMap.size() > 0) {
+      for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
+        clear(new Segment(segmentId, null, null));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 5acca27..9d648f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -231,6 +231,11 @@ public class CarbonTable implements Serializable {
     }
   }
 
+  public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath)
+      throws IOException {
+    return SchemaReader
+        .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, dbName, tableName));
+  }
   /**
    * @param tableInfo
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
new file mode 100644
index 0000000..020787d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ObjectSerializationUtil.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * It provides methods to convert object to Base64 string and vice versa.
+ */
+public class ObjectSerializationUtil {
+
+  private static final Log LOG = LogFactory.getLog(ObjectSerializationUtil.class);
+
+  /**
+   * Convert object to Base64 String
+   *
+   * @param obj Object to be serialized
+   * @return serialized string
+   * @throws IOException
+   */
+  public static String convertObjectToString(Object obj) throws IOException {
+    ByteArrayOutputStream baos = null;
+    GZIPOutputStream gos = null;
+    ObjectOutputStream oos = null;
+
+    try {
+      baos = new ByteArrayOutputStream();
+      gos = new GZIPOutputStream(baos);
+      oos = new ObjectOutputStream(gos);
+      oos.writeObject(obj);
+    } finally {
+      try {
+        if (oos != null) {
+          oos.close();
+        }
+        if (gos != null) {
+          gos.close();
+        }
+        if (baos != null) {
+          baos.close();
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    }
+
+    return CarbonUtil.encodeToString(baos.toByteArray());
+  }
+
+
+  /**
+   * Converts Base64 string to object.
+   *
+   * @param objectString serialized object in string format
+   * @return Object after convert string to object
+   * @throws IOException
+   */
+  public static Object convertStringToObject(String objectString) throws IOException {
+    if (objectString == null) {
+      return null;
+    }
+
+    byte[] bytes = CarbonUtil.decodeStringToBytes(objectString);
+
+    ByteArrayInputStream bais = null;
+    GZIPInputStream gis = null;
+    ObjectInputStream ois = null;
+
+    try {
+      bais = new ByteArrayInputStream(bytes);
+      gis = new GZIPInputStream(bais);
+      ois = new ObjectInputStream(gis);
+      return ois.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Could not read object", e);
+    } finally {
+      try {
+        if (ois != null) {
+          ois.close();
+        }
+        if (gis != null) {
+          gis.close();
+        }
+        if (bais != null) {
+          bais.close();
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 581c3a6..16b49f2 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -252,6 +252,14 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
     CarbonFile[] indexDirs =
         getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
+    if (segment.getFilteredIndexShardNames().size() == 0) {
+      for (CarbonFile indexDir : indexDirs) {
+        DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
+            indexDir.getAbsolutePath());
+        dataMapDistributableList.add(bloomDataMapDistributable);
+      }
+      return dataMapDistributableList;
+    }
     for (CarbonFile indexDir : indexDirs) {
       // Filter out the tasks which are filtered through CG datamap.
       if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 4c6aec3..4bcdebb 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -166,6 +166,15 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>();
     CarbonFile[] indexDirs =
         getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());
+    if (segment.getFilteredIndexShardNames().size() == 0) {
+      for (CarbonFile indexDir : indexDirs) {
+        DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
+            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
+            indexDir.getAbsolutePath());
+        lstDataMapDistribute.add(luceneDataMapDistributable);
+      }
+      return lstDataMapDistribute;
+    }
     for (CarbonFile indexDir : indexDirs) {
       // Filter out the tasks which are filtered through CG datamap.
       if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
deleted file mode 100644
index 6835184..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.api;
-
-import java.util.List;
-
-import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * abstract class for data map job
- */
-public abstract class AbstractDataMapJob implements DataMapJob {
-
-  @Override public void execute(CarbonTable carbonTable,
-      FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
-  }
-
-  @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
-      FilterResolverIntf resolverIntf) {
-    return null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index c5365d5..91da93f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -28,12 +28,13 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapJob;
+import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -54,6 +55,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
@@ -61,7 +63,6 @@ import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -103,7 +104,6 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       "mapreduce.input.carboninputformat.transactional";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-  private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   private static final String PARTITIONS_TO_PRUNE =
@@ -171,22 +171,6 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
 
-  public static void setDataMapJob(Configuration configuration, Object dataMapJob)
-      throws IOException {
-    if (dataMapJob != null) {
-      String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
-      configuration.set(DATA_MAP_DSTR, toString);
-    }
-  }
-
-  public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
-    String jobString = configuration.get(DATA_MAP_DSTR);
-    if (jobString != null) {
-      return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
-    }
-    return null;
-  }
-
   /**
    * It sets unresolved filter expression.
    *
@@ -416,7 +400,7 @@ m filterExpression
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
-    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    DataMapJob dataMapJob = DataMapUtil.getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     // First prune using default datamap on driver side.
     DataMapExprWrapper dataMapExprWrapper = DataMapChooser
@@ -436,8 +420,8 @@ m filterExpression
       pruneSegments(segmentIds, prunedBlocklets);
       // Again prune with CG datamap.
       if (distributedCG && dataMapJob != null) {
-        prunedBlocklets =
-            executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob,
+        prunedBlocklets = DataMapUtil
+            .executeDataMapJob(carbonTable, resolver, segmentIds, cgDataMapExprWrapper, dataMapJob,
                 partitionsToPrune);
       } else {
         prunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune);
@@ -452,8 +436,8 @@ m filterExpression
       if (fgDataMapExprWrapper != null) {
         // Prune segments from already pruned blocklets
         pruneSegments(segmentIds, prunedBlocklets);
-        prunedBlocklets =
-            executeDataMapJob(carbonTable, resolver, segmentIds, fgDataMapExprWrapper, dataMapJob,
+        prunedBlocklets = DataMapUtil
+            .executeDataMapJob(carbonTable, resolver, segmentIds, fgDataMapExprWrapper, dataMapJob,
                 partitionsToPrune);
 
         ExplainCollector.recordFGDataMapPruning(
@@ -463,33 +447,6 @@ m filterExpression
     return prunedBlocklets;
   }
 
-  private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
-      FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper,
-      DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException {
-    String className = "org.apache.carbondata.hadoop.api.DistributableDataMapFormat";
-    FileInputFormat dataMapFormat =
-        createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, className);
-    List<ExtendedBlocklet> prunedBlocklets =
-        dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, resolver);
-    // Apply expression on the blocklets.
-    prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
-    return prunedBlocklets;
-  }
-
-
-  public static FileInputFormat createDataMapJob(CarbonTable carbonTable,
-      DataMapExprWrapper dataMapExprWrapper, List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune, String clsName) {
-    try {
-      Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0];
-      return (FileInputFormat) cons
-          .newInstance(carbonTable, dataMapExprWrapper, segments, partitionsToPrune,
-              BlockletDataMapFactory.class.getName());
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   /**
    * Prune the segments from the already pruned blocklets.
    * @param segments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 7050c8f..b2ff3ab 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -33,8 +33,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.TableProcessingOperations;
 import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
deleted file mode 100644
index c439219..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the
- * datamaps distributably and returns the final blocklet list
- */
-public interface DataMapJob extends Serializable {
-
-  void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format);
-
-  List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
-      FilterResolverIntf filter);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
deleted file mode 100644
index 213c5a5..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.api;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Input format for datamaps, it makes the datamap pruning distributable.
- */
-public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBlocklet> implements
-    Serializable {
-
-  private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
-
-  private CarbonTable table;
-
-  private DataMapExprWrapper dataMapExprWrapper;
-
-  private List<Segment> validSegments;
-
-  private String className;
-
-  private List<PartitionSpec> partitions;
-
-  DistributableDataMapFormat(CarbonTable table,
-      DataMapExprWrapper dataMapExprWrapper, List<Segment> validSegments,
-      List<PartitionSpec> partitions, String className) {
-    this.table = table;
-    this.dataMapExprWrapper = dataMapExprWrapper;
-    this.validSegments = validSegments;
-    this.className = className;
-    this.partitions = partitions;
-  }
-
-  public static void setFilterExp(Configuration configuration, FilterResolverIntf filterExp)
-      throws IOException {
-    if (filterExp != null) {
-      String string = ObjectSerializationUtil.convertObjectToString(filterExp);
-      configuration.set(FILTER_EXP, string);
-    }
-  }
-
-  private static FilterResolverIntf getFilterExp(Configuration configuration) throws IOException {
-    String filterString = configuration.get(FILTER_EXP);
-    if (filterString != null) {
-      Object toObject = ObjectSerializationUtil.convertStringToObject(filterString);
-      return (FilterResolverIntf) toObject;
-    }
-    return null;
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    List<DataMapDistributableWrapper> distributables =
-        dataMapExprWrapper.toDistributable(validSegments);
-    List<InputSplit> inputSplits = new ArrayList<>(distributables.size());
-    inputSplits.addAll(distributables);
-    return inputSplits;
-  }
-
-  @Override
-  public RecordReader<Void, ExtendedBlocklet> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    return new RecordReader<Void, ExtendedBlocklet>() {
-      private Iterator<ExtendedBlocklet> blockletIterator;
-      private ExtendedBlocklet currBlocklet;
-
-      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-          throws IOException, InterruptedException {
-        DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
-        TableDataMap dataMap = DataMapStoreManager.getInstance()
-            .getDataMap(table, distributable.getDistributable().getDataMapSchema());
-        List<ExtendedBlocklet> blocklets = dataMap.prune(distributable.getDistributable(),
-            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
-        for (ExtendedBlocklet blocklet : blocklets) {
-          blocklet.setDataMapUniqueId(distributable.getUniqueId());
-        }
-        blockletIterator = blocklets.iterator();
-      }
-
-      @Override
-      public boolean nextKeyValue() throws IOException, InterruptedException {
-        boolean hasNext = blockletIterator.hasNext();
-        if (hasNext) {
-          currBlocklet = blockletIterator.next();
-        }
-        return hasNext;
-      }
-
-      @Override
-      public Void getCurrentKey() throws IOException, InterruptedException {
-        return null;
-      }
-
-      @Override
-      public ExtendedBlocklet getCurrentValue() throws IOException, InterruptedException {
-        return currBlocklet;
-      }
-
-      @Override
-      public float getProgress() throws IOException, InterruptedException {
-        return 0;
-      }
-
-      @Override
-      public void close() throws IOException {
-
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 9075012..9fd1812 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -167,7 +167,7 @@ public class StoreCreator {
   /**
    * Method to clear the data maps
    */
-  public static void clearDataMaps() {
+  public static void clearDataMaps() throws IOException {
     DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 3208a28..af7397b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -26,6 +26,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
+import org.apache.carbondata.core.datamap.DataMapJob;
+import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -38,7 +40,6 @@ import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.api.DataMapJob;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -119,7 +120,7 @@ public class CarbonInputFormatUtil {
     CarbonInputFormat.setFilterPredicates(conf, filterExpression);
     CarbonInputFormat.setColumnProjection(conf, columnProjection);
     if (dataMapJob != null) {
-      CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+      DataMapUtil.setDataMapJob(conf, dataMapJob);
     } else {
       setDataMapJobIfConfigured(conf);
     }
@@ -164,22 +165,7 @@ public class CarbonInputFormatUtil {
    */
   public static void setDataMapJobIfConfigured(Configuration conf) throws IOException {
     String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
-    CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className));
-  }
-
-  /**
-   * Creates instance for the DataMap Job class
-   *
-   * @param className
-   * @return
-   */
-  public static Object createDataMapJob(String className) {
-    try {
-      return Class.forName(className).getDeclaredConstructors()[0].newInstance();
-    } catch (Exception e) {
-      LOGGER.error(e);
-      return null;
-    }
+    DataMapUtil.setDataMapJob(conf, DataMapUtil.createDataMapJob(className));
   }
 
   public static String createJobTrackerID(java.util.Date date) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java
deleted file mode 100644
index d97df2d..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/ObjectSerializationUtil.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * It provides methods to convert object to Base64 string and vice versa.
- */
-public class ObjectSerializationUtil {
-
-  private static final Log LOG = LogFactory.getLog(ObjectSerializationUtil.class);
-
-  /**
-   * Convert object to Base64 String
-   *
-   * @param obj Object to be serialized
-   * @return serialized string
-   * @throws IOException
-   */
-  public static String convertObjectToString(Object obj) throws IOException {
-    ByteArrayOutputStream baos = null;
-    GZIPOutputStream gos = null;
-    ObjectOutputStream oos = null;
-
-    try {
-      baos = new ByteArrayOutputStream();
-      gos = new GZIPOutputStream(baos);
-      oos = new ObjectOutputStream(gos);
-      oos.writeObject(obj);
-    } finally {
-      try {
-        if (oos != null) {
-          oos.close();
-        }
-        if (gos != null) {
-          gos.close();
-        }
-        if (baos != null) {
-          baos.close();
-        }
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-
-    return CarbonUtil.encodeToString(baos.toByteArray());
-  }
-
-
-  /**
-   * Converts Base64 string to object.
-   *
-   * @param objectString serialized object in string format
-   * @return Object after convert string to object
-   * @throws IOException
-   */
-  public static Object convertStringToObject(String objectString) throws IOException {
-    if (objectString == null) {
-      return null;
-    }
-
-    byte[] bytes = CarbonUtil.decodeStringToBytes(objectString);
-
-    ByteArrayInputStream bais = null;
-    GZIPInputStream gis = null;
-    ObjectInputStream ois = null;
-
-    try {
-      bais = new ByteArrayInputStream(bytes);
-      gis = new GZIPInputStream(bais);
-      ois = new ObjectInputStream(gis);
-      return ois.readObject();
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Could not read object", e);
-    } finally {
-      try {
-        if (ois != null) {
-          ois.close();
-        }
-        if (gis != null) {
-          gis.close();
-        }
-        if (bais != null) {
-          bais.close();
-        }
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java
index 6046aca..a6ec303 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/ObjectSerializationUtilTest.java
@@ -18,11 +18,11 @@
 package org.apache.carbondata.hadoop.test.util;
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
 import junit.framework.TestCase;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 1cf2369..89a5ed6 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -30,10 +30,10 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidPathException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 6d2eb3f..89623cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -124,7 +124,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
          | USING 'lucene'
          | DMProperties('INDEX_COLUMNS'='Name , cIty')
       """.stripMargin)
-
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index ffbcf67..4250269 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -66,7 +66,7 @@ class C2DataMapFactory(
    * @return
    */
   override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
-    ???
+    util.Collections.emptyList()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index cccfb3f..0c4f652 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -261,7 +261,9 @@ class TestDataMapFactory(
   override def getMeta: DataMapMeta = new DataMapMeta(carbonTable.getIndexedColumns(dataMapSchema),
     Seq(ExpressionType.EQUALS).asJava)
 
-  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
+  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
+    util.Collections.emptyList()
+  }
 
   /**
    * delete datamap data if any

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 3cabc7b..1657a80 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -334,7 +334,9 @@ class WaitingDataMapFactory(
 
   override def getMeta: DataMapMeta = new DataMapMeta(carbonTable.getIndexedColumns(dataMapSchema), Seq(ExpressionType.EQUALS).asJava)
 
-  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ???
+  override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = {
+    util.Collections.emptyList()
+  }
 
   /**
    * delete datamap data if any

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index b9a3371..67ea332 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -44,8 +44,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommon
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.profiler.ExplainCollector
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index f51c3bc..6ee566c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -27,9 +27,9 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
 
+import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, DistributableDataMapFormat}
 
 /**
  * Spark job to execute datamap job and prune all the datamaps distributable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 2f23d77..3e0a035 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import java.io.File
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.util.Try
@@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
-import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
+import org.apache.spark.sql.hive._
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -216,14 +217,19 @@ object CarbonEnv {
     var isRefreshed = false
     val carbonEnv = getInstance(sparkSession)
     val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
-      identifier.database.getOrElse("default"), identifier.table)
+      identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+      identifier.table)
     if (table.isEmpty ||
         (table.isDefined && carbonEnv.carbonMetastore
           .checkSchemasModifiedTimeAndReloadTable(identifier))) {
       sparkSession.sessionState.catalog.refreshTable(identifier)
+      val tablePath = CarbonProperties.getStorePath + File.separator + identifier.database
+        .getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase) +
+                      File.separator + identifier.table
       DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,
-          identifier.database.getOrElse("default"), identifier.table))
+        clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
+          identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+          identifier.table))
       isRefreshed = true
     }
     isRefreshed

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index a27b694..f1ed5d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
-import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
 import org.apache.carbondata.events._
 
 /**
@@ -198,6 +198,12 @@ case class CarbonDropDataMapCommand(
         dataMapProvider =
           DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
         DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
+        // if it is indexDataMap provider like lucene, then call cleanData, which will launch a job
+        // to clear datamap from memory(clears from segmentMap and cache), This is called before
+        // deleting the datamap schemas from _System folder
+        if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) {
+          dataMapProvider.cleanData()
+        }
         dataMapProvider.cleanMeta()
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3bef4b6..5ce510b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -60,12 +60,10 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 35db1f5..1da6507 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 @InterfaceAudience.User

http://git-wip-us.apache.org/repos/asf/carbondata/blob/20180485/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index d6eab1d..ac41d2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -44,12 +44,11 @@ import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil, ObjectSerializationUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, SparkDataTypeConverterImpl, Util}