You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:51 UTC

[13/14] incubator-carbondata git commit: rebase

rebase

rebase

rename package

rebase

change package name

fix style

fix spark2


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/66ccd308
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/66ccd308
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/66ccd308

Branch: refs/heads/master
Commit: 66ccd308536d5e64f05fe57aa3a7a9003b4adf5a
Parents: 567fa51
Author: jackylk <ja...@huawei.com>
Authored: Tue Nov 29 17:42:06 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 30 13:19:54 2016 +0530

----------------------------------------------------------------------
 .../examples/GenerateDictionaryExample.scala    |   2 +-
 integration/spark-common/pom.xml                |   2 +-
 .../MalformedCarbonCommandException.java        |  83 ++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 973 ++++++++++++++++++
 .../spark/load/DeleteLoadFolders.java           | 259 +++++
 .../spark/load/DeletedLoadMetadata.java         |  53 +
 .../spark/merger/CarbonCompactionExecutor.java  | 233 +++++
 .../spark/merger/CarbonCompactionUtil.java      | 283 ++++++
 .../spark/merger/CarbonDataMergerUtil.java      | 695 +++++++++++++
 .../spark/merger/CompactionCallable.java        |  44 +
 .../carbondata/spark/merger/CompactionType.java |  28 +
 .../spark/merger/NodeBlockRelation.java         |  60 ++
 .../spark/merger/NodeMultiBlockRelation.java    |  59 ++
 .../spark/merger/RowResultMerger.java           | 336 +++++++
 .../carbondata/spark/merger/TableMeta.java      |  42 +
 .../spark/merger/TupleConversionAdapter.java    |  85 ++
 .../spark/partition/api/DataPartitioner.java    |  54 +
 .../spark/partition/api/Partition.java          |  42 +
 .../api/impl/DataPartitionerProperties.java     |  87 ++
 .../partition/api/impl/DefaultLoadBalancer.java |  69 ++
 .../spark/partition/api/impl/PartitionImpl.java |  54 +
 .../api/impl/PartitionMultiFileImpl.java        |  51 +
 .../api/impl/QueryPartitionHelper.java          |  77 ++
 .../api/impl/SampleDataPartitionerImpl.java     | 151 +++
 .../readsupport/SparkRowReadSupportImpl.java    |  69 ++
 .../carbondata/spark/splits/TableSplit.java     | 129 +++
 .../carbondata/spark/util/CarbonQueryUtil.java  | 142 +++
 .../carbondata/spark/util/LoadMetadataUtil.java |  61 ++
 .../spark/CarbonAliasDecoderRelation.scala      |  43 +
 .../spark/CarbonColumnValidator.scala           |  36 +
 .../apache/carbondata/spark/CarbonFilters.scala | 391 ++++++++
 .../apache/carbondata/spark/CarbonOption.scala  |  48 +
 .../carbondata/spark/CarbonSparkFactory.scala   |  59 ++
 .../spark/DictionaryDetailHelper.scala          |  63 ++
 .../org/apache/carbondata/spark/KeyVal.scala    |  89 ++
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 ++++
 .../spark/csv/CarbonCsvRelation.scala           | 249 +++++
 .../carbondata/spark/csv/CarbonTextFile.scala   |  91 ++
 .../carbondata/spark/csv/DefaultSource.scala    | 183 ++++
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  82 ++
 .../spark/rdd/CarbonDataLoadRDD.scala           | 598 ++++++++++++
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  91 ++
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  84 ++
 .../spark/rdd/CarbonDropTableRDD.scala          |  71 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 557 +++++++++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 342 +++++++
 .../spark/rdd/CarbonSparkPartition.scala        |  35 +
 .../apache/carbondata/spark/rdd/Compactor.scala | 130 +++
 .../spark/rdd/DataLoadCoalescedRDD.scala        |  68 ++
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++++++
 .../spark/tasks/DictionaryWriterTask.scala      | 106 ++
 .../spark/tasks/SortIndexWriterTask.scala       |  59 ++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 195 ++++
 .../carbondata/spark/util/CommonUtil.scala      | 259 +++++
 .../spark/util/DataTypeConverterUtil.scala      |  74 ++
 .../spark/util/GlobalDictionaryUtil.scala       | 843 ++++++++++++++++
 .../CarbonTableIdentifierImplicit.scala         |  42 +
 .../execution/command/carbonTableSchema.scala   | 359 +++++++
 .../spark/sql/hive/DistributionUtil.scala       | 167 ++++
 .../CarbonDecoderOptimizerHelper.scala          | 149 +++
 .../scala/org/apache/spark/util/FileUtils.scala |  94 ++
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 +
 .../scala/org/apache/spark/util/SparkUtil.scala |  73 ++
 .../spark/merger/CarbonCompactionExecutor.java  | 233 -----
 .../spark/merger/CarbonCompactionUtil.java      | 284 ------
 .../spark/merger/CompactionCallable.java        |  44 -
 .../spark/merger/CompactionType.java            |  28 -
 .../spark/merger/RowResultMerger.java           | 336 -------
 .../spark/merger/TupleConversionAdapter.java    |  85 --
 .../MalformedCarbonCommandException.java        |  83 --
 .../carbondata/spark/load/CarbonLoaderUtil.java | 976 -------------------
 .../spark/load/DeleteLoadFolders.java           | 259 -----
 .../spark/load/DeleteLoadFromMetadata.java      |  44 -
 .../spark/load/DeletedLoadMetadata.java         |  53 -
 .../spark/merger/CarbonDataMergerUtil.java      | 696 -------------
 .../spark/merger/NodeBlockRelation.java         |  60 --
 .../spark/merger/NodeMultiBlockRelation.java    |  59 --
 .../spark/partition/api/DataPartitioner.java    |  54 -
 .../spark/partition/api/Partition.java          |  42 -
 .../api/impl/DataPartitionerProperties.java     |  87 --
 .../partition/api/impl/DefaultLoadBalancer.java |  69 --
 .../spark/partition/api/impl/PartitionImpl.java |  54 -
 .../api/impl/PartitionMultiFileImpl.java        |  51 -
 .../api/impl/QueryPartitionHelper.java          |  77 --
 .../api/impl/SampleDataPartitionerImpl.java     | 151 ---
 .../readsupport/SparkRowReadSupportImpl.java    |  69 --
 .../carbondata/spark/splits/TableSplit.java     | 129 ---
 .../carbondata/spark/util/CarbonQueryUtil.java  | 142 ---
 .../carbondata/spark/util/LoadMetadataUtil.java |  61 --
 .../spark/CarbonColumnValidator.scala           |  36 -
 .../spark/CarbonDataFrameWriter.scala           |   2 +-
 .../apache/carbondata/spark/CarbonFilters.scala | 391 --------
 .../apache/carbondata/spark/CarbonOption.scala  |  46 -
 .../carbondata/spark/CarbonSparkFactory.scala   |  60 --
 .../spark/DictionaryDetailHelper.scala          |  62 --
 .../org/apache/carbondata/spark/KeyVal.scala    |  89 --
 .../carbondata/spark/csv/CarbonCsvReader.scala  | 182 ----
 .../spark/csv/CarbonCsvRelation.scala           | 248 -----
 .../carbondata/spark/csv/CarbonTextFile.scala   |  63 --
 .../carbondata/spark/csv/DefaultSource.scala    | 182 ----
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  83 --
 .../spark/rdd/CarbonDataLoadRDD.scala           | 604 ------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  80 +-
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  92 --
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  84 --
 .../spark/rdd/CarbonDropTableRDD.scala          |  72 --
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 558 -----------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 344 -------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   8 +-
 .../apache/carbondata/spark/rdd/Compactor.scala | 133 ---
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   1 -
 .../spark/tasks/DictionaryWriterTask.scala      | 106 --
 .../spark/tasks/SortIndexWriterTask.scala       |  59 --
 .../carbondata/spark/util/CarbonScalaUtil.scala | 219 -----
 .../carbondata/spark/util/CommonUtil.scala      | 251 -----
 .../spark/util/DataTypeConverterUtil.scala      |  55 --
 .../spark/util/GlobalDictionaryUtil.scala       | 875 -----------------
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  68 --
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 -------
 .../spark/sql/CarbonCatalystOperators.scala     |   8 +-
 .../org/apache/spark/sql/CarbonContext.scala    |   4 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  15 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |   8 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  13 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  50 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |  35 +-
 .../org/apache/spark/sql/CarbonSparkUtil.scala  |  45 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   8 +-
 .../CarbonTableIdentifierImplicit.scala         |  42 -
 .../execution/command/carbonTableSchema.scala   | 502 ++--------
 .../apache/spark/sql/hive/CarbonMetastore.scala | 562 +++++++++++
 .../spark/sql/hive/CarbonMetastoreCatalog.scala | 576 -----------
 .../spark/sql/hive/CarbonStrategies.scala       |  19 +-
 .../spark/sql/hive/DistributionUtil.scala       | 145 ---
 .../execution/command/CarbonHiveCommands.scala  |   8 +-
 .../CarbonDecoderOptimizerHelper.scala          | 149 ---
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  24 +-
 .../scala/org/apache/spark/util/FileUtils.scala |  94 --
 .../apache/spark/util/ScalaCompilerUtil.scala   |  35 -
 .../org/apache/spark/util/SplitUtils.scala      |  67 --
 .../TestDataLoadPartitionCoalescer.scala        |   2 -
 .../spark/util/AllDictionaryTestCase.scala      |   2 +-
 .../AutoHighCardinalityIdentifyTestCase.scala   |   2 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   2 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |   2 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |   2 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala | 127 +++
 .../execution/CarbonLateDecodeStrategy.scala    | 130 +++
 148 files changed, 11561 insertions(+), 11292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
index 2d7aed0..e8c437d 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
@@ -63,7 +63,7 @@ object GenerateDictionaryExample {
                       dictFolderPath: String) {
     val dataBaseName = carbonTableIdentifier.getDatabaseName
     val tableName = carbonTableIdentifier.getTableName
-    val carbonRelation = CarbonEnv.getInstance(cc).carbonCatalog.
+    val carbonRelation = CarbonEnv.get.carbonMetastore.
       lookupRelation1(Option(dataBaseName),
         tableName) (cc).asInstanceOf[CarbonRelation]
     val carbonTable = carbonRelation.tableMeta.carbonTable

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index ad312f3..b0ab3ef 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -77,7 +77,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
new file mode 100644
index 0000000..de7d4a2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+
+import java.util.Locale;
+
+public class MalformedCarbonCommandException extends Exception {
+
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public MalformedCarbonCommandException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public MalformedCarbonCommandException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override
+  public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
new file mode 100644
index 0000000..492ceee
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -0,0 +1,973 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.load;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
+import org.apache.carbondata.lcm.locks.ICarbonLock;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
+import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
+import org.apache.carbondata.processing.csvload.DataGraphExecuter;
+import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
+import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
+import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.spark.merger.NodeBlockRelation;
+import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
+
+import com.google.gson.Gson;
+import org.apache.spark.SparkConf;
+import org.apache.spark.util.Utils;
+
+
+public final class CarbonLoaderUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+  /**
+   * minimum no of blocklet required for distribution
+   */
+  private static int minBlockLetsReqForDistribution = 0;
+
+  static {
+    String property = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
+    try {
+      minBlockLetsReqForDistribution = Integer.parseInt(property);
+    } catch (NumberFormatException ne) {
+      LOGGER.info("Invalid configuration. Consisering the defaul");
+      minBlockLetsReqForDistribution =
+          CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
+    }
+  }
+
+  private CarbonLoaderUtil() {
+
+  }
+
+  private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info,
+      CarbonLoadModel loadModel, String outputLocation)
+      throws GraphGeneratorException {
+    DataLoadModel model = new DataLoadModel();
+    model.setCsvLoad(null != dataProcessTaskStatus.getCsvFilePath()
+            || null != dataProcessTaskStatus.getFilesToProcess());
+    model.setSchemaInfo(info);
+    model.setTableName(dataProcessTaskStatus.getTableName());
+    List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
+    if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
+      model.setLoadNames(
+          CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
+      model.setModificationOrDeletionTime(CarbonDataProcessorUtil
+          .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
+    }
+    model.setBlocksID(dataProcessTaskStatus.getBlocksID());
+    model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
+    model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
+    model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());
+    model.setRddIteratorKey(dataProcessTaskStatus.getRddIteratorKey());
+    model.setTaskNo(loadModel.getTaskNo());
+    model.setFactTimeStamp(loadModel.getFactTimeStamp());
+    model.setMaxColumns(loadModel.getMaxColumns());
+    model.setDateFormat(loadModel.getDateFormat());
+    boolean hdfsReadMode =
+        dataProcessTaskStatus.getCsvFilePath() != null
+                && dataProcessTaskStatus.getCsvFilePath().startsWith("hdfs:");
+    int allocate =
+            null != dataProcessTaskStatus.getCsvFilePath()
+                    ? 1 : dataProcessTaskStatus.getFilesToProcess().size();
+    GraphGenerator generator = new GraphGenerator(model, hdfsReadMode, loadModel.getPartitionId(),
+        loadModel.getStorePath(), allocate,
+        loadModel.getCarbonDataLoadSchema(), loadModel.getSegmentId(), outputLocation);
+    generator.generateGraph();
+  }
+
+  public static void executeGraph(CarbonLoadModel loadModel, String storeLocation,
+      String storePath, String kettleHomePath) throws Exception {
+    System.setProperty("KETTLE_HOME", kettleHomePath);
+    if (!new File(storeLocation).mkdirs()) {
+      LOGGER.error("Error while creating the temp store path: " + storeLocation);
+    }
+    String outPutLoc = storeLocation + "/etl";
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+        + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath);
+    // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
+    CarbonProperties.getInstance().addProperty("send.signal.load", "false");
+
+    String fileNamePrefix = "";
+    if (loadModel.isAggLoadRequest()) {
+      fileNamePrefix = "graphgenerator";
+    }
+    String graphPath =
+        outPutLoc + File.separator + databaseName + File.separator + tableName + File.separator
+            + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo() + File.separator
+            + tableName + fileNamePrefix + ".ktr";
+    File path = new File(graphPath);
+    if (path.exists()) {
+      path.delete();
+    }
+
+    DataProcessTaskStatus dataProcessTaskStatus
+            = new DataProcessTaskStatus(databaseName, tableName);
+    dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
+    dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
+    if (loadModel.isDirectLoad()) {
+      dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
+      dataProcessTaskStatus.setDirectLoad(true);
+      dataProcessTaskStatus.setCsvDelimiter(loadModel.getCsvDelimiter());
+      dataProcessTaskStatus.setCsvHeader(loadModel.getCsvHeader());
+    }
+
+    dataProcessTaskStatus.setBlocksID(loadModel.getBlocksID());
+    dataProcessTaskStatus.setEscapeCharacter(loadModel.getEscapeChar());
+    dataProcessTaskStatus.setQuoteCharacter(loadModel.getQuoteChar());
+    dataProcessTaskStatus.setCommentCharacter(loadModel.getCommentChar());
+    dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey());
+    dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat());
+    SchemaInfo info = new SchemaInfo();
+
+    info.setDatabaseName(databaseName);
+    info.setTableName(tableName);
+    info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
+    info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
+    info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
+    info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
+    info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
+    info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction());
+
+    generateGraph(dataProcessTaskStatus, info, loadModel, outPutLoc);
+
+    DataGraphExecuter graphExecuter = new DataGraphExecuter(dataProcessTaskStatus);
+    graphExecuter
+        .executeGraph(graphPath, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN),
+            info, loadModel.getPartitionId(), loadModel.getCarbonDataLoadSchema());
+  }
+
+  public static List<String> addNewSliceNameToList(String newSlice, List<String> activeSlices) {
+    activeSlices.add(newSlice);
+    return activeSlices;
+  }
+
+  public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
+      deleteStorePath(segmentPath);
+    }
+  }
+
+  public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
+      final boolean isCompactionFlow) throws IOException {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    //delete folder which metadata no exist in tablestatus
+    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+      final String partitionCount = i + "";
+      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+      FileType fileType = FileFactory.getFileType(partitionPath);
+      if (FileFactory.isFileExist(partitionPath, fileType)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+          @Override public boolean accept(CarbonFile path) {
+            String segmentId =
+                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+            boolean found = false;
+            for (int j = 0; j < details.length; j++) {
+              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+                  .equals(partitionCount)) {
+                found = true;
+                break;
+              }
+            }
+            return !found;
+          }
+        });
+        for (int k = 0; k < listFiles.length; k++) {
+          String segmentId =
+              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+          if (isCompactionFlow) {
+            if (segmentId.contains(".")) {
+              deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          } else {
+            if (!segmentId.contains(".")) {
+              deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  public static void deleteStorePath(String path) {
+    try {
+      FileType fileType = FileFactory.getFileType(path);
+      if (FileFactory.isFileExist(path, fileType)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+        CarbonUtil.deleteFoldersAndFiles(carbonFile);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+    } catch (CarbonUtilException e) {
+      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+    }
+  }
+
+  public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
+    List<String> activeSlices =
+        new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails oneLoad : details) {
+      if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equals(oneLoad.getLoadStatus())
+          || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(oneLoad.getLoadStatus())
+          || CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
+        if (null != oneLoad.getMergedLoadName()) {
+          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getMergedLoadName();
+          activeSlices.add(loadName);
+        } else {
+          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
+          activeSlices.add(loadName);
+        }
+      }
+    }
+    return activeSlices;
+  }
+
+  public static List<String> getListOfUpdatedSlices(LoadMetadataDetails[] details) {
+    List<String> updatedSlices =
+        new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails oneLoad : details) {
+      if (CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
+        if (null != oneLoad.getMergedLoadName()) {
+          updatedSlices.add(oneLoad.getMergedLoadName());
+        } else {
+          updatedSlices.add(oneLoad.getLoadName());
+        }
+      }
+    }
+    return updatedSlices;
+  }
+
+  public static void removeSliceFromMemory(String databaseName, String tableName, String loadName) {
+    // TODO: Remove from memory
+  }
+
+  /**
+   * This method will delete the local data load folder location after data load is complete
+   *
+   * @param loadModel
+   */
+  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+      boolean isCompactionFlow) {
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+        + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+    if (isCompactionFlow) {
+      tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
+    }
+    // form local store location
+    String localStoreLocation = CarbonProperties.getInstance()
+        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    try {
+      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile() });
+      LOGGER.info("Deleted the local store location" + localStoreLocation);
+    } catch (CarbonUtilException e) {
+      LOGGER.error(e, "Failed to delete local data load folder location");
+    }
+
+  }
+
+  /**
+   * This method will get the store location for the given path, segemnt id and partition id
+   *
+   * @param storePath
+   * @param carbonTableIdentifier
+   * @param segmentId
+   * @param partitionId
+   * @return
+   */
+  public static String getStoreLocation(String storePath,
+      CarbonTableIdentifier carbonTableIdentifier, String segmentId, String partitionId) {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+    String carbonDataFilePath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+    return carbonDataFilePath;
+  }
+
+  /**
+   * This API will write the load level metadata for the loadmanagement module inorder to
+   * manage the load and query execution management smoothly.
+   *
+   * @param loadCount
+   * @param loadMetadataDetails
+   * @param loadModel
+   * @param loadStatus
+   * @param startLoadTime
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
+      CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException {
+
+    boolean status = false;
+
+    String metaDataFilepath =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+                + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String loadEnddate = readCurrentTime();
+        loadMetadataDetails.setTimestamp(loadEnddate);
+        loadMetadataDetails.setLoadStatus(loadStatus);
+        loadMetadataDetails.setLoadName(String.valueOf(loadCount));
+        loadMetadataDetails.setLoadStartTime(startLoadTime);
+
+        List<LoadMetadataDetails> listOfLoadFolderDetails =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+        if (null != listOfLoadFolderDetailsArray) {
+          for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+            listOfLoadFolderDetails.add(loadMetadata);
+          }
+        }
+        listOfLoadFolderDetails.add(loadMetadataDetails);
+
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+            .getDatabaseName() + "." + loadModel.getTableName());
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info(
+            "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+                + "." + loadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
+                .getTableName() + " during table status updation");
+      }
+    }
+    return status;
+  }
+
+  public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
+      String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
+            schema.getCarbonTable().getCarbonTableIdentifier());
+    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+
+    DataOutputStream dataOutputStream;
+    Gson gsonObjectToWrite = new Gson();
+    BufferedWriter brWriter = null;
+
+    AtomicFileOperations writeOperation =
+        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+    try {
+
+      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+      brWriter.write(metadataInstance);
+    } finally {
+      try {
+        if (null != brWriter) {
+          brWriter.flush();
+        }
+      } catch (Exception e) {
+        LOGGER.error("error in  flushing ");
+
+      }
+      CarbonUtil.closeStreams(brWriter);
+      writeOperation.close();
+    }
+
+  }
+
+  public static String readCurrentTime() {
+    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+    String date = null;
+
+    date = sdf.format(new Date());
+
+    return date;
+  }
+
+  public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
+    CarbonTable carbonTable =
+        org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+            .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+    return carbonTable.getMetaDataFilepath();
+  }
+
+  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
+      String carbonStorePath) throws CarbonUtilException {
+    Cache dictCache =
+        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+    return (Dictionary) dictCache.get(columnIdentifier);
+  }
+
+  public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
+      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+      throws CarbonUtilException {
+    return getDictionary(
+        new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType),
+        carbonStorePath);
+  }
+
+  /**
+   * This method will divide the blocks among the tasks of the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @param noOfNodesInput -1 if number of nodes has to be decided
+   *                       based on block location information
+   * @param parallelism    total no of tasks to execute in parallel
+   * @return
+   */
+  public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
+      List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
+      List<String> activeNode) {
+
+    Map<String, List<Distributable>> mapOfNodes =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+    int taskPerNode = parallelism / mapOfNodes.size();
+    //assigning non zero value to noOfTasksPerNode
+    int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
+    // divide the blocks of a node among the tasks of the node.
+    return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+      int noOfNodesInput) {
+    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
+    // -1 if number of nodes has to be decided based on block location information
+    return nodeBlockMapping(blockInfos, -1);
+  }
+
+  /**
+   * the method returns the number of required executors
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> getRequiredExecutors(
+      List<Distributable> blockInfos) {
+    List<NodeBlockRelation> flattenedList =
+        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (Distributable blockInfo : blockInfos) {
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+      }
+    }
+    // sort the flattened data.
+    Collections.sort(flattenedList);
+    Map<String, List<Distributable>> nodeAndBlockMapping =
+        new LinkedHashMap<String, List<Distributable>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // from the flattened list create a mapping of node vs Data blocks.
+    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+    return nodeAndBlockMapping;
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @param noOfNodesInput -1 if number of nodes has to be decided
+   *                       based on block location information
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+      int noOfNodesInput, List<String> activeNodes) {
+
+    Map<String, List<Distributable>> nodeBlocksMap =
+        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<NodeBlockRelation> flattenedList =
+        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    Set<Distributable> uniqueBlocks =
+        new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+
+    int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
+    if (null != activeNodes) {
+      noofNodes = activeNodes.size();
+    }
+    int blocksPerNode = blockInfos.size() / noofNodes;
+    blocksPerNode = blocksPerNode <=0 ? 1 : blocksPerNode;
+
+    // sort the flattened data.
+    Collections.sort(flattenedList);
+
+    Map<String, List<Distributable>> nodeAndBlockMapping =
+        new LinkedHashMap<String, List<Distributable>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // from the flattened list create a mapping of node vs Data blocks.
+    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+
+    // so now we have a map of node vs blocks. allocate the block as per the order
+    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
+
+    // if any blocks remain then assign them to nodes in round robin.
+    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
+
+    return nodeBlocksMap;
+  }
+
+  /**
+   * Assigning the blocks of a node to tasks.
+   *
+   * @param nodeBlocksMap nodeName to list of blocks mapping
+   * @param noOfTasksPerNode
+   * @return
+   */
+  private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
+      Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
+    Map<String, List<List<Distributable>>> outputMap =
+        new HashMap<String, List<List<Distributable>>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // for each node
+    for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
+
+      List<Distributable> blockOfEachNode = eachNode.getValue();
+      //sorting the block so same block will be give to same executor
+      Collections.sort(blockOfEachNode);
+      // create the task list for each node.
+      createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
+
+      // take all the block of node and divide it among the tasks of a node.
+      divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
+    }
+
+    return outputMap;
+  }
+
+  /**
+   * This will divide the blocks of a node to tasks of the node.
+   *
+   * @param outputMap
+   * @param key
+   * @param blockOfEachNode
+   */
+  private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
+      String key, List<Distributable> blockOfEachNode) {
+
+    List<List<Distributable>> taskLists = outputMap.get(key);
+    int tasksOfNode = taskLists.size();
+    int i = 0;
+    for (Distributable block : blockOfEachNode) {
+
+      taskLists.get(i % tasksOfNode).add(block);
+      i++;
+    }
+
+  }
+
+  /**
+   * This will create the empty list for each task of a node.
+   *
+   * @param outputMap
+   * @param noOfTasksPerNode
+   * @param key
+   */
+  private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
+      int noOfTasksPerNode, String key) {
+    List<List<Distributable>> nodeTaskList =
+        new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (int i = 0; i < noOfTasksPerNode; i++) {
+      List<Distributable> eachTask =
+          new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      nodeTaskList.add(eachTask);
+
+    }
+    outputMap.put(key, nodeTaskList);
+
+  }
+
+  /**
+   * If any left over data blocks are present then assign those to nodes in round robin way.
+   *
+   * @param outputMap
+   * @param uniqueBlocks
+   */
+  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
+      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+    if (activeNodes != null) {
+      for (String activeNode : activeNodes) {
+        List<Distributable> blockLst = outputMap.get(activeNode);
+        if (null == blockLst) {
+          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        }
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+        if (blockLst.size() > 0) {
+          outputMap.put(activeNode, blockLst);
+        }
+      }
+    } else {
+      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+        List<Distributable> blockLst = entry.getValue();
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      }
+
+    }
+
+    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+      Iterator<Distributable> blocks = uniqueBlocks.iterator();
+      if (blocks.hasNext()) {
+        Distributable block = blocks.next();
+        List<Distributable> blockLst = entry.getValue();
+        blockLst.add(block);
+        blocks.remove();
+      }
+    }
+  }
+
+  /**
+   * The method populate the blockLst to be allocate to a specific node.
+   * @param uniqueBlocks
+   * @param noOfBlocksPerNode
+   * @param blockLst
+   */
+  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+      List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = uniqueBlocks.iterator();
+    //if the node is already having the per block nodes then avoid assign the extra blocks
+    if (blockLst.size() == noOfBlocksPerNode) {
+      return;
+    }
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      blockLst.add(block);
+      blocks.remove();
+      if (blockLst.size() >= noOfBlocksPerNode) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * To create the final output of the Node and Data blocks
+   *
+   * @param outputMap
+   * @param blocksPerNode
+   * @param uniqueBlocks
+   * @param nodeAndBlockMapping
+   * @param activeNodes
+   */
+  private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
+      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
+      List<String> activeNodes) {
+
+    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
+        new ArrayList<>(nodeAndBlockMapping.size());
+    for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
+      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
+    }
+    // sort nodes based on number of blocks per node, so that nodes having lesser blocks
+    // are assigned first
+    Collections.sort(multiBlockRelations);
+
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+      String nodeName = nodeMultiBlockRelation.getNode();
+      //assign the block to the node only if the node is active
+      String activeExecutor = nodeName;
+      if (null != activeNodes) {
+        activeExecutor = getActiveExecutor(activeNodes, nodeName);
+        if (null == activeExecutor) {
+          continue;
+        }
+      }
+      // this loop will be for each NODE
+      int nodeCapacity = 0;
+      // loop thru blocks of each Node
+      for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+
+        // check if this is already assigned.
+        if (uniqueBlocks.contains(block)) {
+
+          if (null == outputMap.get(activeExecutor)) {
+            List<Distributable> list =
+                new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+            outputMap.put(activeExecutor, list);
+          }
+          // assign this block to this node if node has capacity left
+          if (nodeCapacity < blocksPerNode) {
+            List<Distributable> infos = outputMap.get(activeExecutor);
+            infos.add(block);
+            nodeCapacity++;
+            uniqueBlocks.remove(block);
+          } else {
+            // No need to continue loop as node is full
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * method validates whether the node is active or not.
+   *
+   * @param activeNode
+   * @param nodeName
+   * @return returns true if active else false.
+   */
+  private static String getActiveExecutor(List activeNode, String nodeName) {
+    boolean isActiveNode = activeNode.contains(nodeName);
+    if (isActiveNode) {
+      return nodeName;
+    }
+    //if localhost then retrieve the localhost name then do the check
+    else if (nodeName.equals("localhost")) {
+      try {
+        String hostName = InetAddress.getLocalHost().getHostName();
+        isActiveNode = activeNode.contains(hostName);
+        if(isActiveNode){
+          return hostName;
+        }
+      } catch (UnknownHostException ue) {
+        isActiveNode = false;
+      }
+    } else {
+      try {
+        String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
+        isActiveNode = activeNode.contains(hostAddress);
+        if(isActiveNode){
+          return hostAddress;
+        }
+      } catch (UnknownHostException ue) {
+        isActiveNode = false;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Create the Node and its related blocks Mapping and put in a Map
+   *
+   * @param flattenedList
+   * @param nodeAndBlockMapping
+   */
+  private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
+      Map<String, List<Distributable>> nodeAndBlockMapping) {
+    for (NodeBlockRelation nbr : flattenedList) {
+      String node = nbr.getNode();
+      List<Distributable> list;
+
+      if (null == nodeAndBlockMapping.get(node)) {
+        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        list.add(nbr.getBlock());
+        Collections.sort(list);
+        nodeAndBlockMapping.put(node, list);
+      } else {
+        list = nodeAndBlockMapping.get(node);
+        list.add(nbr.getBlock());
+        Collections.sort(list);
+      }
+    }
+  }
+
+  /**
+   * Create the flat List i.e flattening of the Map.
+   *
+   * @param blockInfos
+   * @param flattenedList
+   * @param uniqueBlocks
+   */
+  private static void createFlattenedListFromMap(List<Distributable> blockInfos,
+      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
+      Set<String> nodeList) {
+    for (Distributable blockInfo : blockInfos) {
+      // put the blocks in the set
+      uniqueBlocks.add(blockInfo);
+
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+          nodeList.add(eachNode);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+      }
+    }
+  }
+
+  /**
+   * This method will get the store location for the given path, segment id and partition id
+   *
+   * @param carbonStorePath
+   * @param dbName
+   * @param tableName
+   * @param segmentId
+   */
+  public static void checkAndCreateCarbonDataLocation(String carbonStorePath, String dbName,
+      String tableName, String segmentId) {
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(dbName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+    String carbonDataDirectoryPath =
+        carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+  }
+
+  /**
+   * return the Array of available local-dirs
+   *
+   * @param conf
+   * @return
+   */
+  public static String[] getConfiguredLocalDirs(SparkConf conf) {
+    return Utils.getConfiguredLocalDirs(conf);
+  }
+
+  /**
+   * This will update the old table status details before clean files to the latest table status.
+   * @param oldList
+   * @param newList
+   * @return
+   */
+  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+    List<LoadMetadataDetails> newListMetadata =
+        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+    for (LoadMetadataDetails oldSegment : oldList) {
+      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+      }
+    }
+    return newListMetadata;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
new file mode 100644
index 0000000..2b3979f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Project Name  : Carbon
+ * Module Name   : CARBON spark interface
+ * Author    : R00903928
+ * Created Date  : 22-Sep-2015
+ * FileName   : DeleteLoadFolders.java
+ * Description   : for physical deletion of load folders.
+ * Class Version  : 1.0
+ */
+package org.apache.carbondata.spark.load;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+
+public final class DeleteLoadFolders {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+  private DeleteLoadFolders() {
+
+  }
+
+  /**
+   * returns segment path
+   *
+   * @param loadModel
+   * @param storeLocation
+   * @param partitionId
+   * @param oneLoad
+   * @return
+   */
+  private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
+      int partitionId, LoadMetadataDetails oneLoad) {
+
+    String path = null;
+    String segmentId = oneLoad.getLoadName();
+
+    path = new CarbonStorePath(storeLocation).getCarbonTablePath(
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
+        .getCarbonDataDirectoryPath("" + partitionId, segmentId);
+    return path;
+  }
+
+  private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
+
+    boolean status = false;
+    try {
+      if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+        CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+        CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile file) {
+            return (CarbonTablePath.isCarbonDataFile(file.getName())
+                || CarbonTablePath.isCarbonIndexFile(file.getName()));
+          }
+        });
+
+        //if there are no fact and msr metadata files present then no need to keep
+        //entry in metadata.
+        if (filesToBeDeleted.length == 0) {
+          status = true;
+        } else {
+
+          for (CarbonFile eachFile : filesToBeDeleted) {
+            if (!eachFile.delete()) {
+              LOGGER.warn("Unable to delete the file as per delete command "
+                  + eachFile.getAbsolutePath());
+              status = false;
+            } else {
+              status = true;
+            }
+          }
+        }
+        // need to delete the complete folder.
+        if(status){
+          if(!file.delete()){
+            LOGGER.warn("Unable to delete the folder as per delete command "
+                + file.getAbsolutePath());
+            status = false;
+          }
+        }
+
+      } else {
+        status = false;
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Unable to delete the file as per delete command " + path);
+    }
+
+    return status;
+
+  }
+
+  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+      boolean isForceDelete) {
+    if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
+        || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
+        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+      if (isForceDelete) {
+        return true;
+      }
+      String deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+      SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      Date deletionDate = null;
+      String date = null;
+      Date currentTimeStamp = null;
+      try {
+        deletionDate = parser.parse(deletionTime);
+        date = CarbonLoaderUtil.readCurrentTime();
+        currentTimeStamp = parser.parse(date);
+      } catch (ParseException e) {
+        return false;
+      }
+
+      long difference = currentTimeStamp.getTime() - deletionDate.getTime();
+
+      long minutesElapsed = (difference / (1000 * 60));
+
+      int maxTime;
+      try {
+        maxTime = Integer.parseInt(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+      } catch (NumberFormatException e) {
+        maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+      }
+      if (minutesElapsed > maxTime) {
+        return true;
+      }
+
+    }
+
+    return false;
+  }
+
+  private static void factFileRenaming(String loadFolderPath) {
+
+    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
+    try {
+      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
+        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
+
+        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile file) {
+            return (file.getName().endsWith('_' + CarbonCommonConstants.FACT_FILE_UPDATED));
+          }
+        });
+
+        for (CarbonFile file : listFiles) {
+          if (!file.renameTo(file.getName().substring(0,
+              file.getName().length() - CarbonCommonConstants.FACT_FILE_UPDATED.length()))) {
+            LOGGER.warn("could not rename the updated fact file.");
+          }
+        }
+
+      }
+    } catch (IOException e) {
+      LOGGER.error("exception" + e.getMessage());
+    }
+
+  }
+
+  private static void cleanDeletedFactFile(String loadFolderPath) {
+    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
+    try {
+      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
+        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
+
+        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile file) {
+            return (file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION));
+          }
+        });
+
+        for (CarbonFile file : listFiles) {
+          if (!file.delete()) {
+            LOGGER.warn("could not delete the marked fact file.");
+          }
+        }
+
+      }
+    } catch (IOException e) {
+      LOGGER.error("exception" + e.getMessage());
+    }
+  }
+
+  /**
+   * @param loadModel
+   * @param storeLocation
+   * @param isForceDelete
+   * @param details
+   * @return
+   *
+   */
+  public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
+      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
+    List<LoadMetadataDetails> deletedLoads =
+        new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    boolean isDeleted = false;
+
+    if (details != null && details.length != 0) {
+      for (LoadMetadataDetails oneLoad : details) {
+        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+          String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
+          if (deletionStatus) {
+            isDeleted = true;
+            oneLoad.setVisibility("false");
+            deletedLoads.add(oneLoad);
+            LOGGER.info("Info: " +
+                " Deleted the load " + oneLoad.getLoadName());
+          }
+        }
+      }
+    }
+
+    return isDeleted;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
new file mode 100644
index 0000000..661e17c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.load;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+public class DeletedLoadMetadata implements Serializable {
+
+  private static final long serialVersionUID = 7083059404172117208L;
+  private Map<String, String> deletedLoadMetadataMap =
+      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  public void addDeletedLoadMetadata(String loadId, String status) {
+    deletedLoadMetadataMap.put(loadId, status);
+  }
+
+  public List<String> getDeletedLoadMetadataIds() {
+    return new ArrayList<String>(deletedLoadMetadataMap.keySet());
+  }
+
+  public String getDeletedLoadMetadataStatus(String loadId) {
+    if (deletedLoadMetadataMap.containsKey(loadId)) {
+      return deletedLoadMetadataMap.get(loadId);
+    } else {
+      return null;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
new file mode 100644
index 0000000..9fa63d6
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.executor.QueryExecutor;
+import org.apache.carbondata.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.BatchResult;
+import org.apache.carbondata.scan.result.iterator.RawResultIterator;
+
+/**
+ * Executor class for executing the query on the selected segments to be merged.
+ * This will fire a select * query and get the raw result.
+ */
+public class CarbonCompactionExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+  private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
+  private final SegmentProperties destinationSegProperties;
+  private final String databaseName;
+  private final String factTableName;
+  private final Map<String, TaskBlockInfo> segmentMapping;
+  private final String storePath;
+  private QueryExecutor queryExecutor;
+  private CarbonTable carbonTable;
+  private QueryModel queryModel;
+
+  /**
+   * Constructor
+   *
+   * @param segmentMapping
+   * @param segmentProperties
+   * @param databaseName
+   * @param factTableName
+   * @param storePath
+   * @param carbonTable
+   */
+  public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
+      SegmentProperties segmentProperties, String databaseName, String factTableName,
+      String storePath, CarbonTable carbonTable,
+      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping) {
+
+    this.segmentMapping = segmentMapping;
+
+    this.destinationSegProperties = segmentProperties;
+
+    this.databaseName = databaseName;
+
+    this.factTableName = factTableName;
+
+    this.storePath = storePath;
+
+    this.carbonTable = carbonTable;
+
+    this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
+  }
+
+  /**
+   * For processing of the table blocks.
+   *
+   * @return List of Carbon iterators
+   */
+  public List<RawResultIterator> processTableBlocks() throws QueryExecutionException {
+
+    List<RawResultIterator> resultList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<TableBlockInfo> list = null;
+    queryModel = prepareQueryModel(list);
+    // iterate each seg ID
+    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+      String segmentId = taskMap.getKey();
+      List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+
+      int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
+
+      SegmentProperties sourceSegProperties =
+          new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
+
+      // for each segment get taskblock info
+      TaskBlockInfo taskBlockInfo = taskMap.getValue();
+      Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+
+      for (String task : taskBlockListMapping) {
+
+        list = taskBlockInfo.getTableBlockInfoList(task);
+        Collections.sort(list);
+        LOGGER.info("for task -" + task + "-block size is -" + list.size());
+        queryModel.setTableBlockInfos(list);
+        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
+            destinationSegProperties));
+
+      }
+    }
+
+    return resultList;
+  }
+
+  /**
+   * get executor and execute the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+      throws QueryExecutionException {
+
+    queryModel.setTableBlockInfos(blockList);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
+    CarbonIterator<BatchResult> iter = null;
+    try {
+      iter = queryExecutor.execute(queryModel);
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e.getMessage());
+      throw e;
+    }
+
+    return iter;
+  }
+
+  /**
+   * Below method will be used
+   * for cleanup
+   */
+  public void finish() {
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e, "Problem while finish: ");
+    }
+    clearDictionaryFromQueryModel();
+  }
+
+  /**
+   * This method will clear the dictionary access count after its usage is complete so
+   * that column can be deleted form LRU cache whenever memory reaches threshold
+   */
+  private void clearDictionaryFromQueryModel() {
+    if (null != queryModel) {
+      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+      if (null != columnToDictionaryMapping) {
+        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+          CarbonUtil.clearDictionaryCache(entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * Preparing of the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  public QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+
+    QueryModel model = new QueryModel();
+
+    model.setTableBlockInfos(blockList);
+    model.setCountStarQuery(false);
+    model.setDetailQuery(true);
+    model.setForcedDetailRawQuery(true);
+    model.setFilterExpressionResolverTree(null);
+
+    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
+      QueryDimension queryDimension = new QueryDimension(dim.getColName());
+      dims.add(queryDimension);
+    }
+    model.setQueryDimension(dims);
+
+    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
+      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+      msrs.add(queryMeasure);
+    }
+    model.setQueryMeasures(msrs);
+
+    model.setQueryId(System.nanoTime() + "");
+
+    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+
+    model.setAggTable(false);
+    model.setLimit(-1);
+
+    model.setTable(carbonTable);
+
+    model.setInMemoryRecordSize(CarbonCommonConstants.COMPACTION_INMEMORY_RECORD_SIZE);
+
+    return model;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
new file mode 100644
index 0000000..ca33fac
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.carbon.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CarbonUtilException;
+
+
+/**
+ * Utility Class for the Compaction Flow.
+ */
+public class CarbonCompactionUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+
+  /**
+   * To create a mapping of Segment Id and TableBlockInfo.
+   *
+   * @param tableBlockInfoList
+   * @return
+   */
+  public static Map<String, TaskBlockInfo> createMappingForSegments(
+      List<TableBlockInfo> tableBlockInfoList) {
+
+    // stores taskBlockInfo of each segment
+    Map<String, TaskBlockInfo> segmentBlockInfoMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+
+    for (TableBlockInfo info : tableBlockInfoList) {
+      String segId = info.getSegmentId();
+      // check if segId is already present in map
+      TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
+      // extract task ID from file Path.
+      String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
+      // if taskBlockInfo is not there, then create and add
+      if (null == taskBlockInfoMapping) {
+        taskBlockInfoMapping = new TaskBlockInfo();
+        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+        // put the taskBlockInfo with respective segment id
+        segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
+      } else
+      {
+        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+      }
+    }
+    return segmentBlockInfoMapping;
+
+  }
+
+  /**
+   * Grouping the taskNumber and list of TableBlockInfo.
+   * @param info
+   * @param taskBlockMapping
+   * @param taskNo
+   */
+  private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
+      TaskBlockInfo taskBlockMapping, String taskNo) {
+    // get the corresponding list from task mapping.
+    List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
+    if (null != blockLists) {
+      blockLists.add(info);
+    } else {
+      blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      blockLists.add(info);
+      taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
+    }
+  }
+
+  /**
+   * To create a mapping of Segment Id and DataFileFooter.
+   *
+   * @param tableBlockInfoList
+   * @return
+   */
+  public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
+      List<TableBlockInfo> tableBlockInfoList) throws IndexBuilderException {
+
+    Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
+    for (TableBlockInfo blockInfo : tableBlockInfoList) {
+      List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
+      String segId = blockInfo.getSegmentId();
+
+      DataFileFooter dataFileMatadata = null;
+      // check if segId is already present in map
+      List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
+      try {
+        dataFileMatadata = CarbonUtil
+            .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
+                blockInfo.getBlockLength());
+      } catch (CarbonUtilException e) {
+        throw new IndexBuilderException(e);
+      }
+      if (null == metadataList) {
+        // if it is not present
+        eachSegmentBlocks.add(dataFileMatadata);
+        segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
+      } else {
+
+        // if its already present then update the list.
+        metadataList.add(dataFileMatadata);
+      }
+    }
+    return segmentBlockInfoMapping;
+
+  }
+
+  /**
+   * Check whether the file to indicate the compaction is present or not.
+   * @param metaFolderPath
+   * @return
+   */
+  public static boolean isCompactionRequiredForTable(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
+          .isFileExist(majorCompactionStatusFile,
+              FileFactory.getFileType(majorCompactionStatusFile))) {
+        return true;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage() );
+    }
+    return false;
+  }
+
+  /**
+   * Determine the type of the compaction received.
+   * @param metaFolderPath
+   * @return
+   */
+  public static CompactionType determineCompactionType(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile))) {
+        return CompactionType.MINOR_COMPACTION;
+      }
+      if (FileFactory.isFileExist(majorCompactionStatusFile,
+          FileFactory.getFileType(majorCompactionStatusFile))) {
+        return CompactionType.MAJOR_COMPACTION;
+      }
+
+    } catch (IOException e) {
+      LOGGER.error("Exception in determining the compaction request file " + e.getMessage() );
+    }
+    return CompactionType.MINOR_COMPACTION;
+  }
+
+  /**
+   * Delete the compation request file once the compaction is done.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean deleteCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String compactionRequiredFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (FileFactory
+          .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
+        if (FileFactory
+            .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
+            .delete()) {
+          LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
+          return true;
+        } else {
+          LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
+        }
+      } else {
+        LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * Creation of the compaction request if someother compaction is in progress.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean createCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String statusFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+        if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
+          LOGGER.info("successfully created a compaction required file - " + statusFile);
+          return true;
+        } else {
+          LOGGER.error("Not able to create a compaction required file - " + statusFile);
+          return false;
+        }
+      } else {
+        LOGGER.info("Compaction request file : " + statusFile + " already exist.");
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in creating the compaction request file " + e.getMessage() );
+    }
+    return false;
+  }
+
+  /**
+   * This will check if any compaction request has been received for any table.
+   *
+   * @param tableMetas
+   * @return
+   */
+  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+      List<CarbonTableIdentifier> skipList) {
+    for (TableMeta table : tableMetas) {
+      CarbonTable ctable = table.carbonTable;
+      String metadataPath = ctable.getMetaDataFilepath();
+      // check for the compaction required file and at the same time exclude the tables which are
+      // present in the skip list.
+      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
+          .contains(table.carbonTableIdentifier)) {
+        return table;
+      }
+    }
+    return null;
+  }
+}