You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:45 UTC

[07/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
deleted file mode 100644
index f2a1f9f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ /dev/null
@@ -1,976 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.CarbonUtilException;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.csvload.DataGraphExecuter;
-import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
-import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
-import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
-import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.spark.merger.NodeBlockRelation;
-import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
-
-import com.google.gson.Gson;
-import org.apache.spark.SparkConf;
-import org.apache.spark.util.Utils;
-
-
-public final class CarbonLoaderUtil {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
-  /**
-   * minimum no of blocklet required for distribution
-   */
-  private static int minBlockLetsReqForDistribution = 0;
-
-  static {
-    String property = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
-    try {
-      minBlockLetsReqForDistribution = Integer.parseInt(property);
-    } catch (NumberFormatException ne) {
-      LOGGER.info("Invalid configuration. Consisering the defaul");
-      minBlockLetsReqForDistribution =
-          CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
-    }
-  }
-
-  private CarbonLoaderUtil() {
-
-  }
-
-  private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info,
-      CarbonLoadModel loadModel, String outputLocation)
-      throws GraphGeneratorException {
-    DataLoadModel model = new DataLoadModel();
-    model.setCsvLoad(null != dataProcessTaskStatus.getCsvFilePath()
-            || null != dataProcessTaskStatus.getFilesToProcess());
-    model.setSchemaInfo(info);
-    model.setTableName(dataProcessTaskStatus.getTableName());
-    List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
-    if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
-      model.setLoadNames(
-          CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
-      model.setModificationOrDeletionTime(CarbonDataProcessorUtil
-          .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
-    }
-    model.setBlocksID(dataProcessTaskStatus.getBlocksID());
-    model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
-    model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
-    model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());
-    model.setRddIteratorKey(dataProcessTaskStatus.getRddIteratorKey());
-    model.setTaskNo(loadModel.getTaskNo());
-    model.setFactTimeStamp(loadModel.getFactTimeStamp());
-    model.setMaxColumns(loadModel.getMaxColumns());
-    model.setDateFormat(loadModel.getDateFormat());
-    boolean hdfsReadMode =
-        dataProcessTaskStatus.getCsvFilePath() != null
-                && dataProcessTaskStatus.getCsvFilePath().startsWith("hdfs:");
-    int allocate =
-            null != dataProcessTaskStatus.getCsvFilePath()
-                    ? 1 : dataProcessTaskStatus.getFilesToProcess().size();
-    GraphGenerator generator = new GraphGenerator(model, hdfsReadMode, loadModel.getPartitionId(),
-        loadModel.getStorePath(), allocate,
-        loadModel.getCarbonDataLoadSchema(), loadModel.getSegmentId(), outputLocation);
-    generator.generateGraph();
-  }
-
-  public static void executeGraph(CarbonLoadModel loadModel, String storeLocation,
-      String storePath, String kettleHomePath) throws Exception {
-    System.setProperty("KETTLE_HOME", kettleHomePath);
-    if (!new File(storeLocation).mkdirs()) {
-      LOGGER.error("Error while creating the temp store path: " + storeLocation);
-    }
-    String outPutLoc = storeLocation + "/etl";
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
-        + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath);
-    // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
-    CarbonProperties.getInstance().addProperty("send.signal.load", "false");
-
-    String fileNamePrefix = "";
-    if (loadModel.isAggLoadRequest()) {
-      fileNamePrefix = "graphgenerator";
-    }
-    String graphPath =
-        outPutLoc + File.separator + databaseName + File.separator + tableName + File.separator
-            + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo() + File.separator
-            + tableName + fileNamePrefix + ".ktr";
-    File path = new File(graphPath);
-    if (path.exists()) {
-      path.delete();
-    }
-
-    DataProcessTaskStatus dataProcessTaskStatus
-            = new DataProcessTaskStatus(databaseName, tableName);
-    dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
-    dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
-    if (loadModel.isDirectLoad()) {
-      dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
-      dataProcessTaskStatus.setDirectLoad(true);
-      dataProcessTaskStatus.setCsvDelimiter(loadModel.getCsvDelimiter());
-      dataProcessTaskStatus.setCsvHeader(loadModel.getCsvHeader());
-    }
-
-    dataProcessTaskStatus.setBlocksID(loadModel.getBlocksID());
-    dataProcessTaskStatus.setEscapeCharacter(loadModel.getEscapeChar());
-    dataProcessTaskStatus.setQuoteCharacter(loadModel.getQuoteChar());
-    dataProcessTaskStatus.setCommentCharacter(loadModel.getCommentChar());
-    dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey());
-    dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat());
-    SchemaInfo info = new SchemaInfo();
-
-    info.setDatabaseName(databaseName);
-    info.setTableName(tableName);
-    info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
-    info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
-    info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
-    info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
-    info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
-    info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction());
-
-    generateGraph(dataProcessTaskStatus, info, loadModel, outPutLoc);
-
-    DataGraphExecuter graphExecuter = new DataGraphExecuter(dataProcessTaskStatus);
-    graphExecuter
-        .executeGraph(graphPath, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN),
-            info, loadModel.getPartitionId(), loadModel.getCarbonDataLoadSchema());
-  }
-
-  public static List<String> addNewSliceNameToList(String newSlice, List<String> activeSlices) {
-    activeSlices.add(newSlice);
-    return activeSlices;
-  }
-
-  public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
-      deleteStorePath(segmentPath);
-    }
-  }
-
-  public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
-      final boolean isCompactionFlow) throws IOException {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
-    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
-
-    //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      final String partitionCount = i + "";
-      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
-      FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
-                  .equals(partitionCount)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
-          String segmentId =
-              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          }
-        }
-      }
-    }
-  }
-
-  public static void deleteStorePath(String path) {
-    try {
-      FileType fileType = FileFactory.getFileType(path);
-      if (FileFactory.isFileExist(path, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
-        CarbonUtil.deleteFoldersAndFiles(carbonFile);
-      }
-    } catch (IOException e) {
-      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
-    } catch (CarbonUtilException e) {
-      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
-    }
-  }
-
-  public static List<String> getListOfValidSlices(LoadMetadataDetails[] details) {
-    List<String> activeSlices =
-        new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (LoadMetadataDetails oneLoad : details) {
-      if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equals(oneLoad.getLoadStatus())
-          || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(oneLoad.getLoadStatus())
-          || CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
-        if (null != oneLoad.getMergedLoadName()) {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getMergedLoadName();
-          activeSlices.add(loadName);
-        } else {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
-          activeSlices.add(loadName);
-        }
-      }
-    }
-    return activeSlices;
-  }
-
-  public static List<String> getListOfUpdatedSlices(LoadMetadataDetails[] details) {
-    List<String> updatedSlices =
-        new ArrayList<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (LoadMetadataDetails oneLoad : details) {
-      if (CarbonCommonConstants.MARKED_FOR_UPDATE.equals(oneLoad.getLoadStatus())) {
-        if (null != oneLoad.getMergedLoadName()) {
-          updatedSlices.add(oneLoad.getMergedLoadName());
-        } else {
-          updatedSlices.add(oneLoad.getLoadName());
-        }
-      }
-    }
-    return updatedSlices;
-  }
-
-  public static void removeSliceFromMemory(String databaseName, String tableName, String loadName) {
-    // TODO: Remove from memory
-  }
-
-  /**
-   * This method will delete the local data load folder location after data load is complete
-   *
-   * @param loadModel
-   */
-  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      boolean isCompactionFlow) {
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
-        + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
-    if (isCompactionFlow) {
-      tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
-    }
-    // form local store location
-    String localStoreLocation = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
-    try {
-      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile() });
-      LOGGER.info("Deleted the local store location" + localStoreLocation);
-    } catch (CarbonUtilException e) {
-      LOGGER.error(e, "Failed to delete local data load folder location");
-    }
-
-  }
-
-  /**
-   * This method will get the store location for the given path, segemnt id and partition id
-   *
-   * @param storePath
-   * @param carbonTableIdentifier
-   * @param segmentId
-   * @param partitionId
-   * @return
-   */
-  public static String getStoreLocation(String storePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId, String partitionId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
-    String carbonDataFilePath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
-    return carbonDataFilePath;
-  }
-
-  /**
-   * This API will write the load level metadata for the loadmanagement module inorder to
-   * manage the load and query execution management smoothly.
-   *
-   * @param loadCount
-   * @param loadMetadataDetails
-   * @param loadModel
-   * @param loadStatus
-   * @param startLoadTime
-   * @return boolean which determines whether status update is done or not.
-   * @throws IOException
-   */
-  public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
-      CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException {
-
-    boolean status = false;
-
-    String metaDataFilepath =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
-
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info(
-            "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
-                + " for table status updation");
-
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        String loadEnddate = readCurrentTime();
-        loadMetadataDetails.setTimestamp(loadEnddate);
-        loadMetadataDetails.setLoadStatus(loadStatus);
-        loadMetadataDetails.setLoadName(String.valueOf(loadCount));
-        loadMetadataDetails.setLoadStartTime(startLoadTime);
-
-        List<LoadMetadataDetails> listOfLoadFolderDetails =
-            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-        if (null != listOfLoadFolderDetailsArray) {
-          for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-            listOfLoadFolderDetails.add(loadMetadata);
-          }
-        }
-        listOfLoadFolderDetails.add(loadMetadataDetails);
-
-        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
-            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
-
-        status = true;
-      } else {
-        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
-            .getDatabaseName() + "." + loadModel.getTableName());
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info(
-            "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
-                + "." + loadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
-                .getTableName() + " during table status updation");
-      }
-    }
-    return status;
-  }
-
-  public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
-      String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(schema.getCarbonTable().getStorePath(),
-            schema.getCarbonTable().getCarbonTableIdentifier());
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
-    try {
-
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOGGER.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-
-  }
-
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
-  public static String extractLoadMetadataFileLocation(CarbonLoadModel loadModel) {
-    CarbonTable carbonTable =
-        org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-            .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
-    return carbonTable.getMetaDataFilepath();
-  }
-
-  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
-      String carbonStorePath) throws CarbonUtilException {
-    Cache dictCache =
-        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
-    return (Dictionary) dictCache.get(columnIdentifier);
-  }
-
-  public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
-      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
-      throws CarbonUtilException {
-    return getDictionary(
-        new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType),
-        carbonStorePath);
-  }
-
-  /**
-   * This method will divide the blocks among the tasks of the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @param noOfNodesInput -1 if number of nodes has to be decided
-   *                       based on block location information
-   * @param parallelism    total no of tasks to execute in parallel
-   * @return
-   */
-  public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
-      List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
-      List<String> activeNode) {
-
-    Map<String, List<Distributable>> mapOfNodes =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
-    int taskPerNode = parallelism / mapOfNodes.size();
-    //assigning non zero value to noOfTasksPerNode
-    int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
-    // divide the blocks of a node among the tasks of the node.
-    return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput) {
-    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
-    // -1 if number of nodes has to be decided based on block location information
-    return nodeBlockMapping(blockInfos, -1);
-  }
-
-  /**
-   * the method returns the number of required executors
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> getRequiredExecutors(
-      List<Distributable> blockInfos) {
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Distributable blockInfo : blockInfos) {
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
-      }
-    }
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-    return nodeAndBlockMapping;
-  }
-
-  /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @param noOfNodesInput -1 if number of nodes has to be decided
-   *                       based on block location information
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput, List<String> activeNodes) {
-
-    Map<String, List<Distributable>> nodeBlocksMap =
-        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<NodeBlockRelation> flattenedList =
-        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    Set<Distributable> uniqueBlocks =
-        new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
-
-    int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
-    if (null != activeNodes) {
-      noofNodes = activeNodes.size();
-    }
-    int blocksPerNode = blockInfos.size() / noofNodes;
-    blocksPerNode = blocksPerNode <=0 ? 1 : blocksPerNode;
-
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-
-    // so now we have a map of node vs blocks. allocate the block as per the order
-    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
-
-    // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
-
-    return nodeBlocksMap;
-  }
-
-  /**
-   * Assigning the blocks of a node to tasks.
-   *
-   * @param nodeBlocksMap nodeName to list of blocks mapping
-   * @param noOfTasksPerNode
-   * @return
-   */
-  private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
-      Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
-    Map<String, List<List<Distributable>>> outputMap =
-        new HashMap<String, List<List<Distributable>>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // for each node
-    for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
-
-      List<Distributable> blockOfEachNode = eachNode.getValue();
-      //sorting the block so same block will be give to same executor
-      Collections.sort(blockOfEachNode);
-      // create the task list for each node.
-      createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
-
-      // take all the block of node and divide it among the tasks of a node.
-      divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
-    }
-
-    return outputMap;
-  }
-
-  /**
-   * This will divide the blocks of a node to tasks of the node.
-   *
-   * @param outputMap
-   * @param key
-   * @param blockOfEachNode
-   */
-  private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
-      String key, List<Distributable> blockOfEachNode) {
-
-    List<List<Distributable>> taskLists = outputMap.get(key);
-    int tasksOfNode = taskLists.size();
-    int i = 0;
-    for (Distributable block : blockOfEachNode) {
-
-      taskLists.get(i % tasksOfNode).add(block);
-      i++;
-    }
-
-  }
-
-  /**
-   * This will create the empty list for each task of a node.
-   *
-   * @param outputMap
-   * @param noOfTasksPerNode
-   * @param key
-   */
-  private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
-      int noOfTasksPerNode, String key) {
-    List<List<Distributable>> nodeTaskList =
-        new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (int i = 0; i < noOfTasksPerNode; i++) {
-      List<Distributable> eachTask =
-          new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-      nodeTaskList.add(eachTask);
-
-    }
-    outputMap.put(key, nodeTaskList);
-
-  }
-
-  /**
-   * If any left over data blocks are present then assign those to nodes in round robin way.
-   *
-   * @param outputMap
-   * @param uniqueBlocks
-   */
-  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
-
-    if (activeNodes != null) {
-      for (String activeNode : activeNodes) {
-        List<Distributable> blockLst = outputMap.get(activeNode);
-        if (null == blockLst) {
-          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        }
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-        if (blockLst.size() > 0) {
-          outputMap.put(activeNode, blockLst);
-        }
-      }
-    } else {
-      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-        List<Distributable> blockLst = entry.getValue();
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-      }
-
-    }
-
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      if (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        List<Distributable> blockLst = entry.getValue();
-        blockLst.add(block);
-        blocks.remove();
-      }
-    }
-  }
-
-  /**
-   * The method populate the blockLst to be allocate to a specific node.
-   * @param uniqueBlocks
-   * @param noOfBlocksPerNode
-   * @param blockLst
-   */
-  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
-      List<Distributable> blockLst) {
-    Iterator<Distributable> blocks = uniqueBlocks.iterator();
-    //if the node is already having the per block nodes then avoid assign the extra blocks
-    if (blockLst.size() == noOfBlocksPerNode) {
-      return;
-    }
-    while (blocks.hasNext()) {
-      Distributable block = blocks.next();
-      blockLst.add(block);
-      blocks.remove();
-      if (blockLst.size() >= noOfBlocksPerNode) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * To create the final output of the Node and Data blocks
-   *
-   * @param outputMap
-   * @param blocksPerNode
-   * @param uniqueBlocks
-   * @param nodeAndBlockMapping
-   * @param activeNodes
-   */
-  private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
-      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
-      List<String> activeNodes) {
-
-    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
-        new ArrayList<>(nodeAndBlockMapping.size());
-    for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
-      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
-    }
-    // sort nodes based on number of blocks per node, so that nodes having lesser blocks
-    // are assigned first
-    Collections.sort(multiBlockRelations);
-
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
-      String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
-      String activeExecutor = nodeName;
-      if (null != activeNodes) {
-        activeExecutor = getActiveExecutor(activeNodes, nodeName);
-        if (null == activeExecutor) {
-          continue;
-        }
-      }
-      // this loop will be for each NODE
-      int nodeCapacity = 0;
-      // loop thru blocks of each Node
-      for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
-
-        // check if this is already assigned.
-        if (uniqueBlocks.contains(block)) {
-
-          if (null == outputMap.get(activeExecutor)) {
-            List<Distributable> list =
-                new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(activeExecutor, list);
-          }
-          // assign this block to this node if node has capacity left
-          if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(activeExecutor);
-            infos.add(block);
-            nodeCapacity++;
-            uniqueBlocks.remove(block);
-          } else {
-            // No need to continue loop as node is full
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * method validates whether the node is active or not.
-   *
-   * @param activeNode
-   * @param nodeName
-   * @return returns true if active else false.
-   */
-  private static String getActiveExecutor(List activeNode, String nodeName) {
-    boolean isActiveNode = activeNode.contains(nodeName);
-    if (isActiveNode) {
-      return nodeName;
-    }
-    //if localhost then retrieve the localhost name then do the check
-    else if (nodeName.equals("localhost")) {
-      try {
-        String hostName = InetAddress.getLocalHost().getHostName();
-        isActiveNode = activeNode.contains(hostName);
-        if(isActiveNode){
-          return hostName;
-        }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
-      }
-    } else {
-      try {
-        String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
-        isActiveNode = activeNode.contains(hostAddress);
-        if(isActiveNode){
-          return hostAddress;
-        }
-      } catch (UnknownHostException ue) {
-        isActiveNode = false;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Create the Node and its related blocks Mapping and put in a Map
-   *
-   * @param flattenedList
-   * @param nodeAndBlockMapping
-   */
-  private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
-      Map<String, List<Distributable>> nodeAndBlockMapping) {
-    for (NodeBlockRelation nbr : flattenedList) {
-      String node = nbr.getNode();
-      List<Distributable> list;
-
-      if (null == nodeAndBlockMapping.get(node)) {
-        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        list.add(nbr.getBlock());
-        Collections.sort(list);
-        nodeAndBlockMapping.put(node, list);
-      } else {
-        list = nodeAndBlockMapping.get(node);
-        list.add(nbr.getBlock());
-        Collections.sort(list);
-      }
-    }
-  }
-
-  /**
-   * Create the flat List i.e flattening of the Map.
-   *
-   * @param blockInfos
-   * @param flattenedList
-   * @param uniqueBlocks
-   */
-  private static void createFlattenedListFromMap(List<Distributable> blockInfos,
-      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
-      Set<String> nodeList) {
-    for (Distributable blockInfo : blockInfos) {
-      // put the blocks in the set
-      uniqueBlocks.add(blockInfo);
-
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-          nodeList.add(eachNode);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
-      }
-    }
-  }
-
-  /**
-   * This method will get the store location for the given path, segment id and partition id
-   *
-   * @param carbonStorePath
-   * @param dbName
-   * @param tableName
-   * @param partitionCount
-   * @param segmentId
-   */
-  public static void checkAndCreateCarbonDataLocation(String carbonStorePath, String dbName,
-      String tableName, int partitionCount, String segmentId) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(dbName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    for (int i = 0; i < partitionCount; i++) {
-      String carbonDataDirectoryPath =
-          carbonTablePath.getCarbonDataDirectoryPath(String.valueOf(i), segmentId);
-      CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
-    }
-  }
-
-  /**
-   * return the Array of available local-dirs
-   *
-   * @param conf
-   * @return
-   */
-  public static String[] getConfiguredLocalDirs(SparkConf conf) {
-    return Utils.getConfiguredLocalDirs(conf);
-  }
-
-  /**
-   * This will update the old table status details before clean files to the latest table status.
-   * @param oldList
-   * @param newList
-   * @return
-   */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
-      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
-    List<LoadMetadataDetails> newListMetadata =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
-    for (LoadMetadataDetails oldSegment : oldList) {
-      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
-        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
-      }
-    }
-    return newListMetadata;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
deleted file mode 100644
index 2b3979f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name  : Carbon
- * Module Name   : CARBON spark interface
- * Author    : R00903928
- * Created Date  : 22-Sep-2015
- * FileName   : DeleteLoadFolders.java
- * Description   : for physical deletion of load folders.
- * Class Version  : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-
-public final class DeleteLoadFolders {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
-  private DeleteLoadFolders() {
-
-  }
-
-  /**
-   * returns segment path
-   *
-   * @param loadModel
-   * @param storeLocation
-   * @param partitionId
-   * @param oneLoad
-   * @return
-   */
-  private static String getSegmentPath(CarbonLoadModel loadModel, String storeLocation,
-      int partitionId, LoadMetadataDetails oneLoad) {
-
-    String path = null;
-    String segmentId = oneLoad.getLoadName();
-
-    path = new CarbonStorePath(storeLocation).getCarbonTablePath(
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())
-        .getCarbonDataDirectoryPath("" + partitionId, segmentId);
-    return path;
-  }
-
-  private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
-
-    boolean status = false;
-    try {
-      if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-        CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-        CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (CarbonTablePath.isCarbonDataFile(file.getName())
-                || CarbonTablePath.isCarbonIndexFile(file.getName()));
-          }
-        });
-
-        //if there are no fact and msr metadata files present then no need to keep
-        //entry in metadata.
-        if (filesToBeDeleted.length == 0) {
-          status = true;
-        } else {
-
-          for (CarbonFile eachFile : filesToBeDeleted) {
-            if (!eachFile.delete()) {
-              LOGGER.warn("Unable to delete the file as per delete command "
-                  + eachFile.getAbsolutePath());
-              status = false;
-            } else {
-              status = true;
-            }
-          }
-        }
-        // need to delete the complete folder.
-        if(status){
-          if(!file.delete()){
-            LOGGER.warn("Unable to delete the folder as per delete command "
-                + file.getAbsolutePath());
-            status = false;
-          }
-        }
-
-      } else {
-        status = false;
-      }
-    } catch (IOException e) {
-      LOGGER.warn("Unable to delete the file as per delete command " + path);
-    }
-
-    return status;
-
-  }
-
-  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
-        || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
-        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      if (isForceDelete) {
-        return true;
-      }
-      String deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-      SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      Date deletionDate = null;
-      String date = null;
-      Date currentTimeStamp = null;
-      try {
-        deletionDate = parser.parse(deletionTime);
-        date = CarbonLoaderUtil.readCurrentTime();
-        currentTimeStamp = parser.parse(date);
-      } catch (ParseException e) {
-        return false;
-      }
-
-      long difference = currentTimeStamp.getTime() - deletionDate.getTime();
-
-      long minutesElapsed = (difference / (1000 * 60));
-
-      int maxTime;
-      try {
-        maxTime = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
-      } catch (NumberFormatException e) {
-        maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
-      }
-      if (minutesElapsed > maxTime) {
-        return true;
-      }
-
-    }
-
-    return false;
-  }
-
-  private static void factFileRenaming(String loadFolderPath) {
-
-    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
-    try {
-      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
-        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
-
-        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (file.getName().endsWith('_' + CarbonCommonConstants.FACT_FILE_UPDATED));
-          }
-        });
-
-        for (CarbonFile file : listFiles) {
-          if (!file.renameTo(file.getName().substring(0,
-              file.getName().length() - CarbonCommonConstants.FACT_FILE_UPDATED.length()))) {
-            LOGGER.warn("could not rename the updated fact file.");
-          }
-        }
-
-      }
-    } catch (IOException e) {
-      LOGGER.error("exception" + e.getMessage());
-    }
-
-  }
-
-  private static void cleanDeletedFactFile(String loadFolderPath) {
-    FileFactory.FileType fileType = FileFactory.getFileType(loadFolderPath);
-    try {
-      if (FileFactory.isFileExist(loadFolderPath, fileType)) {
-        CarbonFile loadFolder = FileFactory.getCarbonFile(loadFolderPath, fileType);
-
-        CarbonFile[] listFiles = loadFolder.listFiles(new CarbonFileFilter() {
-
-          @Override public boolean accept(CarbonFile file) {
-            return (file.getName().endsWith(CarbonCommonConstants.FACT_DELETE_EXTENSION));
-          }
-        });
-
-        for (CarbonFile file : listFiles) {
-          if (!file.delete()) {
-            LOGGER.warn("could not delete the marked fact file.");
-          }
-        }
-
-      }
-    } catch (IOException e) {
-      LOGGER.error("exception" + e.getMessage());
-    }
-  }
-
-  /**
-   * @param loadModel
-   * @param storeLocation
-   * @param isForceDelete
-   * @param details
-   * @return
-   *
-   */
-  public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
-      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
-    List<LoadMetadataDetails> deletedLoads =
-        new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    boolean isDeleted = false;
-
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
-          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
-          if (deletionStatus) {
-            isDeleted = true;
-            oneLoad.setVisibility("false");
-            deletedLoads.add(oneLoad);
-            LOGGER.info("Info: " +
-                " Deleted the load " + oneLoad.getLoadName());
-          }
-        }
-      }
-    }
-
-    return isDeleted;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
deleted file mode 100644
index 0926e1c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name  : Carbon
- * Module Name   : CARBON Data Processor
- * Author    : R00903928
- * Created Date  : 21-Sep-2015
- * FileName   : DeleteLoadFromMetadata.java
- * Description   : Kettle step to generate MD Key
- * Class Version  : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DeleteLoadFromMetadata {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
-
-  private DeleteLoadFromMetadata() {
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
deleted file mode 100644
index 661e17c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeletedLoadMetadata.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.load;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public class DeletedLoadMetadata implements Serializable {
-
-  private static final long serialVersionUID = 7083059404172117208L;
-  private Map<String, String> deletedLoadMetadataMap =
-      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  public void addDeletedLoadMetadata(String loadId, String status) {
-    deletedLoadMetadataMap.put(loadId, status);
-  }
-
-  public List<String> getDeletedLoadMetadataIds() {
-    return new ArrayList<String>(deletedLoadMetadataMap.keySet());
-  }
-
-  public String getDeletedLoadMetadataStatus(String loadId) {
-    if (deletedLoadMetadataMap.containsKey(loadId)) {
-      return deletedLoadMetadataMap.get(loadId);
-    } else {
-      return null;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
deleted file mode 100644
index 84e6c00..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ /dev/null
@@ -1,696 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.carbon.path.CarbonStorePath;
-import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.integration.spark.merger.CompactionType;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-
-/**
- * utility class for load merging.
- */
-public final class CarbonDataMergerUtil {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
-
-  private CarbonDataMergerUtil() {
-
-  }
-
-  /**
-   * Returns the size of all the carbondata files present in the segment.
-   * @param carbonFile
-   * @return
-   */
-  private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
-    long factSize = 0;
-
-    // carbon data file case.
-    CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
-
-      @Override public boolean accept(CarbonFile file) {
-        return CarbonTablePath.isCarbonDataFile(file.getName());
-      }
-    });
-
-    for (CarbonFile fact : factFile) {
-      factSize += fact.getSize();
-    }
-
-    return factSize;
-  }
-
-  /**
-   * To check whether the merge property is enabled or not.
-   *
-   * @return
-   */
-
-  public static boolean checkIfAutoLoadMergingRequired() {
-    // load merge is not supported as per new store format
-    // moving the load merge check in early to avoid unnecessary load listing causing IOException
-    // check whether carbons segment merging operation is enabled or not.
-    // default will be false.
-    String isLoadMergeEnabled = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
-            CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
-    if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Form the Name of the New Merge Folder
-   *
-   * @param segmentsToBeMergedList
-   * @return
-   */
-  public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
-    String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
-    if (firstSegmentName.contains(".")) {
-      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
-      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
-      int fraction = Integer.parseInt(afterDecimal) + 1;
-      String mergedSegmentName = beforeDecimal + "." + fraction;
-      return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
-    } else {
-      String mergeName = firstSegmentName + "." + 1;
-      return CarbonCommonConstants.LOAD_FOLDER + mergeName;
-    }
-
-  }
-
-  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
-      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
-      String mergeLoadStartTime, CompactionType compactionType) {
-
-    boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
-
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
-            + carbonLoadModel.getTableName() + " for table status updation ");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
-
-        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        String mergedLoadNumber = MergedLoadName.substring(
-            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
-        String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
-        for (LoadMetadataDetails loadDetail : loadDetails) {
-          // check if this segment is merged.
-          if (loadsToMerge.contains(loadDetail)) {
-            // if the compacted load is deleted after the start of the compaction process,
-            // then need to discard the compaction process and treat it as failed compaction.
-            if (loadDetail.getLoadStatus()
-                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
-                  + " is deleted after the compaction is started.");
-              return tableStatusUpdationStatus;
-            }
-            loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
-            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
-            loadDetail.setMergedLoadName(mergedLoadNumber);
-          }
-        }
-
-        // create entry for merged one.
-        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-        String loadEnddate = CarbonLoaderUtil.readCurrentTime();
-        loadMetadataDetails.setTimestamp(loadEnddate);
-        loadMetadataDetails.setLoadName(mergedLoadNumber);
-        loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
-        loadMetadataDetails.setPartitionCount("0");
-        // if this is a major compaction then set the segment as major compaction.
-        if (compactionType == CompactionType.MAJOR_COMPACTION) {
-          loadMetadataDetails.setMajorCompacted("true");
-        }
-
-        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
-
-        // put the merged folder entry
-        updatedDetailsList.add(loadMetadataDetails);
-
-        try {
-          SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
-              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
-          tableStatusUpdationStatus = true;
-        } catch (IOException e) {
-          LOGGER.error("Error while writing metadata");
-        }
-      } else {
-        LOGGER.error(
-            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + "for table status updation");
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
-            .getDatabaseName() + "." + carbonLoadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + " during table status updation");
-      }
-    }
-    return tableStatusUpdationStatus;
-  }
-
-  /**
-   * To identify which all segments can be merged.
-   *
-   * @param storeLocation
-   * @param carbonLoadModel
-   * @param compactionSize
-   * @return
-   */
-  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
-      CarbonLoadModel carbonLoadModel, long compactionSize,
-      List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
-    List sortedSegments = new ArrayList(segments);
-
-    sortSegments(sortedSegments);
-
-    // check preserve property and preserve the configured number of latest loads.
-
-    List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
-        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
-
-    // filter the segments if the compaction based on days is configured.
-
-    List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
-        identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
-    List<LoadMetadataDetails> listOfSegmentsToBeMerged;
-    // identify the segments to merge based on the Size of the segments across partition.
-    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
-
-      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
-          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
-    } else {
-
-      listOfSegmentsToBeMerged =
-          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
-    }
-
-    return listOfSegmentsToBeMerged;
-  }
-
-  /**
-   * Sorting of the segments.
-   * @param segments
-   */
-  public static void sortSegments(List segments) {
-    // sort the segment details.
-    Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
-      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
-        double seg1Id = Double.parseDouble(seg1.getLoadName());
-        double seg2Id = Double.parseDouble(seg2.getLoadName());
-        if (seg1Id - seg2Id < 0) {
-          return -1;
-        }
-        if (seg1Id - seg2Id > 0) {
-          return 1;
-        }
-        return 0;
-      }
-    });
-  }
-
-  /**
-   * This method will return the list of loads which are loaded at the same interval.
-   * This property is configurable.
-   *
-   * @param listOfSegmentsBelowThresholdSize
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
-      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
-
-    List<LoadMetadataDetails> loadsOfSameDate =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    long numberOfDaysAllowedToMerge = 0;
-    try {
-      numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
-              CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
-      if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
-        LOGGER.error(
-            "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
-                + " is incorrect."
-                + " Correct value should be in range of 0 -100. Taking the default value.");
-        numberOfDaysAllowedToMerge =
-            Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
-      }
-
-    } catch (NumberFormatException e) {
-      numberOfDaysAllowedToMerge =
-          Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
-    }
-    // if true then process loads according to the load date.
-    if (numberOfDaysAllowedToMerge > 0) {
-
-      // filter loads based on the loaded date
-      boolean first = true;
-      Date segDate1 = null;
-      SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
-
-        if (first) {
-          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
-          first = false;
-          continue;
-        }
-        String segmentDate = segment.getLoadStartTime();
-        Date segDate2 = null;
-        try {
-          segDate2 = sdf.parse(segmentDate);
-        } catch (ParseException e) {
-          LOGGER.error("Error while parsing segment start time" + e.getMessage());
-        }
-
-        if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
-          loadsOfSameDate.add(segment);
-        }
-        // if the load is beyond merged date.
-        // then reset everything and continue search for loads.
-        else if (loadsOfSameDate.size() < 2) {
-          loadsOfSameDate.clear();
-          // need to add the next segment as first and  to check further
-          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
-        } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
-          break;
-        }
-      }
-    } else {
-      return listOfSegmentsBelowThresholdSize;
-    }
-
-    return loadsOfSameDate;
-  }
-
-  /**
-   * @param loadsOfSameDate
-   * @param segment
-   * @return
-   */
-  private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
-      LoadMetadataDetails segment, SimpleDateFormat sdf) {
-    String baselineLoadStartTime = segment.getLoadStartTime();
-    Date segDate1 = null;
-    try {
-      segDate1 = sdf.parse(baselineLoadStartTime);
-    } catch (ParseException e) {
-      LOGGER.error("Error while parsing segment start time" + e.getMessage());
-    }
-    loadsOfSameDate.add(segment);
-    return segDate1;
-  }
-
-  /**
-   * Method to check if the load dates are complied to the configured dates.
-   *
-   * @param segDate1
-   * @param segDate2
-   * @return
-   */
-  private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
-      long numberOfDaysAllowedToMerge) {
-    if(segDate1 == null || segDate2 == null) {
-      return false;
-    }
-    // take 1 st date add the configured days .
-    Calendar cal1 = Calendar.getInstance();
-    cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
-    Calendar cal2 = Calendar.getInstance();
-    cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
-
-    long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
-
-    if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Identify the segments to be merged based on the Size in case of Major compaction.
-   *
-   * @param compactionSize
-   * @param listOfSegmentsAfterPreserve
-   * @param carbonLoadModel
-   * @param storeLocation
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
-      long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, String storeLocation) {
-
-    List<LoadMetadataDetails> segmentsToBeMerged =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    CarbonTableIdentifier tableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
-    // total length
-    long totalLength = 0;
-
-    // check size of each segment , sum it up across partitions
-    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
-      String segId = segment.getLoadName();
-      // variable to store one  segment size across partition.
-      long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(storeLocation, tableIdentifier, segId);
-
-      // if size of a segment is greater than the Major compaction size. then ignore it.
-      if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
-        // if already 2 segments have been found for merging then stop scan here and merge.
-        if (segmentsToBeMerged.size() > 1) {
-          break;
-        } else { // if only one segment is found then remove the earlier one in list.
-          // reset the total length to 0.
-          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-          totalLength = 0;
-          continue;
-        }
-      }
-
-      totalLength += sizeOfOneSegmentAcrossPartition;
-
-      // in case of major compaction the size doesnt matter. all the segments will be merged.
-      if (totalLength < (compactionSize * 1024 * 1024)) {
-        segmentsToBeMerged.add(segment);
-      } else { // if already 2 segments have been found for merging then stop scan here and merge.
-        if (segmentsToBeMerged.size() > 1) {
-          break;
-        } else { // if only one segment is found then remove the earlier one in list and put this.
-          // reset the total length to the current identified segment.
-          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-          segmentsToBeMerged.add(segment);
-          totalLength = sizeOfOneSegmentAcrossPartition;
-        }
-      }
-
-    }
-
-    return segmentsToBeMerged;
-  }
-
-  /**
-   * For calculating the size of the specified segment
-   * @param storeLocation
-   * @param tableIdentifier
-   * @param segId
-   * @return
-   */
-  private static long getSizeOfSegment(String storeLocation,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = CarbonLoaderUtil
-        .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
-    CarbonFile segmentFolder =
-        FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-    return getSizeOfFactFileInLoad(segmentFolder);
-  }
-
-  /**
-   * Identify the segments to be merged based on the segment count
-   *
-   * @param listOfSegmentsAfterPreserve
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
-      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
-
-    List<LoadMetadataDetails> mergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<LoadMetadataDetails> unMergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    int[] noOfSegmentLevelsCount =
-        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
-
-    int level1Size = 0;
-    int level2Size = 0;
-    int size = noOfSegmentLevelsCount.length;
-
-    if (size >= 2) {
-      level1Size = noOfSegmentLevelsCount[0];
-      level2Size = noOfSegmentLevelsCount[1];
-    } else if (size == 1) {
-      level1Size = noOfSegmentLevelsCount[0];
-    }
-
-    int unMergeCounter = 0;
-    int mergeCounter = 0;
-
-    // check size of each segment , sum it up across partitions
-    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
-      String segName = segment.getLoadName();
-
-      // if a segment is already merged 2 levels then it s name will become .2
-      // need to exclude those segments from minor compaction.
-      // if a segment is major compacted then should not be considered for minor.
-      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
-          segment.isMajorCompacted() != null && segment.isMajorCompacted()
-              .equalsIgnoreCase("true"))) {
-        continue;
-      }
-
-      // check if the segment is merged or not
-
-      if (!isMergedSegment(segName)) {
-        //if it is an unmerged segment then increment counter
-        unMergeCounter++;
-        unMergedSegments.add(segment);
-        if (unMergeCounter == (level1Size)) {
-          return unMergedSegments;
-        }
-      } else {
-        mergeCounter++;
-        mergedSegments.add(segment);
-        if (mergeCounter == (level2Size)) {
-          return mergedSegments;
-        }
-      }
-    }
-    return new ArrayList<>(0);
-  }
-
-  /**
-   * To check if the segment is merged or not.
-   * @param segName
-   * @return
-   */
-  private static boolean isMergedSegment(String segName) {
-    if(segName.contains(".")){
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * checks number of loads to be preserved and returns remaining valid segments
-   *
-   * @param segments
-   * @return
-   */
-  private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
-      List<LoadMetadataDetails> segments) {
-
-    int numberOfSegmentsToBePreserved = 0;
-    // check whether the preserving of the segments from merging is enabled or not.
-    // get the number of loads to be preserved.
-    numberOfSegmentsToBePreserved =
-        CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
-    // get the number of valid segments and retain the latest loads from merging.
-    return CarbonDataMergerUtil
-        .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
-  }
-
-  /**
-   * Retain the number of segments which are to be preserved and return the remaining list of
-   * segments.
-   *
-   * @param loadMetadataDetails
-   * @param numberOfSegToBeRetained
-   * @return
-   */
-  private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
-      List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
-
-    List<LoadMetadataDetails> validList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (LoadMetadataDetails segment : loadMetadataDetails) {
-      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-          || segment.getLoadStatus()
-          .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || segment
-          .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE)) {
-        validList.add(segment);
-      }
-    }
-
-    // check if valid list is big enough for removing the number of seg to be retained.
-    // last element
-    int removingIndex = validList.size() - 1;
-
-    for (int i = validList.size(); i > 0; i--) {
-      if (numberOfSegToBeRetained == 0) {
-        break;
-      }
-      // remove last segment
-      validList.remove(removingIndex--);
-      numberOfSegToBeRetained--;
-    }
-    return validList;
-
-  }
-
-  /**
-   * This will give the compaction sizes configured based on compaction type.
-   *
-   * @param compactionType
-   * @return
-   */
-  public static long getCompactionSize(CompactionType compactionType) {
-
-    long compactionSize = 0;
-    switch (compactionType) {
-      case MAJOR_COMPACTION:
-        compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
-        break;
-      default: // this case can not come.
-    }
-    return compactionSize;
-  }
-
-  /**
-   * For getting the comma separated valid segments for merging.
-   *
-   * @param loadMetadataDetails
-   * @return
-   */
-  public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
-    StringBuilder builder = new StringBuilder();
-    for (LoadMetadataDetails segment : loadMetadataDetails) {
-      //check if this load is an already merged load.
-      if (null != segment.getMergedLoadName()) {
-        builder.append(segment.getMergedLoadName() + ",");
-      } else {
-        builder.append(segment.getLoadName() + ",");
-      }
-    }
-    builder.deleteCharAt(builder.length() - 1);
-    return builder.toString();
-  }
-
-  /**
-   * Combining the list of maps to one map.
-   *
-   * @param mapsOfNodeBlockMapping
-   * @return
-   */
-  public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
-      List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
-
-    Map<String, List<TableBlockInfo>> combinedMap =
-        new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // traverse list of maps.
-    for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
-      // traverse inside each map.
-      for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) {
-
-        String node = eachEntry.getKey();
-        List<TableBlockInfo> blocks = eachEntry.getValue();
-
-        // if already that node detail exist in the combined map.
-        if (null != combinedMap.get(node)) {
-          List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
-          blocksAlreadyPresent.addAll(blocks);
-        } else { // if its not present in map then put to map.
-          combinedMap.put(node, blocks);
-        }
-      }
-    }
-    return combinedMap;
-  }
-
-  /**
-   * Removing the already merged segments from list.
-   */
-  public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
-      List<LoadMetadataDetails> segments,
-      LoadMetadataDetails lastSeg) {
-
-    // take complete list of segments.
-    List<LoadMetadataDetails> list = new ArrayList<>(segments);
-    // sort list
-    CarbonDataMergerUtil.sortSegments(list);
-
-    // first filter out newly added segments.
-    return list.subList(0, list.indexOf(lastSeg) + 1);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
deleted file mode 100644
index 93ba2a3..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-
-/**
- * Block to Node mapping
- */
-public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
-
-  private final Distributable block;
-  private final String node;
-
-  public NodeBlockRelation(Distributable block, String node) {
-    this.block = block;
-    this.node = node;
-
-  }
-
-  public Distributable getBlock() {
-    return block;
-  }
-
-  public String getNode() {
-    return node;
-  }
-
-  @Override public int compareTo(NodeBlockRelation obj) {
-    return this.getNode().compareTo(obj.getNode());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof NodeBlockRelation)) {
-      return false;
-    }
-    NodeBlockRelation o = (NodeBlockRelation) obj;
-    return node.equals(o.node);
-  }
-
-  @Override public int hashCode() {
-    return node.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
deleted file mode 100644
index 6b4d1bc..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-
-public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
-
-  private final List<Distributable> blocks;
-  private final String node;
-
-  public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
-    this.node = node;
-    this.blocks = blocks;
-
-  }
-
-  public List<Distributable> getBlocks() {
-    return blocks;
-  }
-
-  public String getNode() {
-    return node;
-  }
-
-  @Override public int compareTo(NodeMultiBlockRelation obj) {
-    return this.blocks.size() - obj.getBlocks().size();
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof NodeMultiBlockRelation)) {
-      return false;
-    }
-    NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
-    return blocks.equals(o.blocks) && node.equals(o.node);
-  }
-
-  @Override public int hashCode() {
-    return blocks.hashCode() + node.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
deleted file mode 100644
index 58f3a2d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.util.List;
-
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-
-import org.apache.spark.sql.execution.command.Partitioner;
-
-public interface DataPartitioner {
-  /**
-   * Initialise the partitioner based on the given columns
-   */
-  void initialize(String basePath, String[] columns, Partitioner partitioner);
-
-  /**
-   * All the partitions built by the Partitioner
-   */
-  List<Partition> getAllPartitions();
-
-  /**
-   * Partition where the tuple should be present. (API used for data loading purpose)
-   */
-  Partition getPartionForTuple(Object[] tuple, long rowCounter);
-
-  /**
-   * Identifies the partitions applicable for the given filter (API used for For query)
-   */
-  List<Partition> getPartitions(CarbonQueryPlan queryPlan);
-
-  String[] getPartitionedColumns();
-
-  Partitioner getPartitioner();
-
-}
-