You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:18 UTC

[02/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index e0d4b73..70a8703 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -50,7 +50,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
  * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
  * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
  */
-public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> {
+public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonFactDataWriterImplV3.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
deleted file mode 100644
index b93fcb7..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-public class BadRecordsLogger {
-
-  /**
-   * Comment for <code>LOGGER</code>
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(BadRecordsLogger.class.getName());
-  /**
-   * Which holds the key and if any bad rec found to check from API to update
-   * the status
-   */
-  private static Map<String, String> badRecordEntry =
-      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  /**
-   * File Name
-   */
-  private String fileName;
-  /**
-   * Store path
-   */
-  private String storePath;
-  /**
-   * FileChannel
-   */
-  private BufferedWriter bufferedWriter;
-  private DataOutputStream outStream;
-  /**
-   * csv file writer
-   */
-  private BufferedWriter bufferedCSVWriter;
-  private DataOutputStream outCSVStream;
-  /**
-   * bad record log file path
-   */
-  private String logFilePath;
-  /**
-   * csv file path
-   */
-  private String csvFilePath;
-
-  /**
-   * task key which is DatabaseName/TableName/tablename
-   */
-  private String taskKey;
-
-  private boolean badRecordsLogRedirect;
-
-  private boolean badRecordLoggerEnable;
-
-  private boolean badRecordConvertNullDisable;
-
-  private boolean isDataLoadFail;
-
-  // private final Object syncObject =new Object();
-
-  public BadRecordsLogger(String key, String fileName, String storePath,
-      boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
-      boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
-    // Initially no bad rec
-    taskKey = key;
-    this.fileName = fileName;
-    this.storePath = storePath;
-    this.badRecordsLogRedirect = badRecordsLogRedirect;
-    this.badRecordLoggerEnable = badRecordLoggerEnable;
-    this.badRecordConvertNullDisable = badRecordConvertNullDisable;
-    this.isDataLoadFail = isDataLoadFail;
-  }
-
-  /**
-   * @param key DatabaseNaame/TableName/tablename
-   * @return return "Partially"
-   */
-  public static String hasBadRecord(String key) {
-    return badRecordEntry.get(key);
-  }
-
-  /**
-   * @param key DatabaseNaame/TableName/tablename
-   * @return remove key from the map
-   */
-  public static String removeBadRecordKey(String key) {
-    return badRecordEntry.remove(key);
-  }
-
-  public void addBadRecordsToBuilder(Object[] row, String reason)
-      throws CarbonDataLoadingException {
-    if (badRecordsLogRedirect || badRecordLoggerEnable) {
-      StringBuilder logStrings = new StringBuilder();
-      int size = row.length;
-      int count = size;
-      for (int i = 0; i < size; i++) {
-        if (null == row[i]) {
-          char ch =
-              logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
-          if (ch == ',') {
-            logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
-          }
-          break;
-        } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
-          logStrings.append("null");
-        } else {
-          logStrings.append(row[i]);
-        }
-        if (count > 1) {
-          logStrings.append(',');
-        }
-        count--;
-      }
-      if (badRecordsLogRedirect) {
-        writeBadRecordsToCSVFile(logStrings);
-      }
-      if (badRecordLoggerEnable) {
-        logStrings.append("----->");
-        if (null != reason) {
-          if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
-            logStrings
-                .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, "null"));
-          } else {
-            logStrings.append(reason);
-          }
-        }
-        writeBadRecordsToFile(logStrings);
-      }
-    } else {
-      // setting partial success entry since even if bad records are there then load
-      // status should be partial success regardless of bad record logged
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  /**
-   *
-   */
-  private synchronized void writeBadRecordsToFile(StringBuilder logStrings)
-      throws CarbonDataLoadingException {
-    if (null == logFilePath) {
-      logFilePath =
-          this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
-              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    }
-    try {
-      if (null == bufferedWriter) {
-        FileType fileType = FileFactory.getFileType(storePath);
-        if (!FileFactory.isFileExist(this.storePath, fileType)) {
-          // create the folders if not exist
-          FileFactory.mkdirs(this.storePath, fileType);
-
-          // create the files
-          FileFactory.createNewFile(logFilePath, fileType);
-        }
-
-        outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
-
-        bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
-            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      }
-      bufferedWriter.write(logStrings.toString());
-      bufferedWriter.newLine();
-    } catch (FileNotFoundException e) {
-      LOGGER.error("Bad Log Files not found");
-      throw new CarbonDataLoadingException("Bad Log Files not found", e);
-    } catch (IOException e) {
-      LOGGER.error("Error While writing bad record log File");
-      throw new CarbonDataLoadingException("Error While writing bad record log File", e);
-    } finally {
-      // if the Bad record file is created means it partially success
-      // if any entry present with key that means its have bad record for
-      // that key
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  /**
-   * method will write the row having bad record in the csv file.
-   *
-   * @param logStrings
-   */
-  private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings)
-      throws CarbonDataLoadingException {
-    if (null == csvFilePath) {
-      csvFilePath =
-          this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
-              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-    }
-    try {
-      if (null == bufferedCSVWriter) {
-        FileType fileType = FileFactory.getFileType(storePath);
-        if (!FileFactory.isFileExist(this.storePath, fileType)) {
-          // create the folders if not exist
-          FileFactory.mkdirs(this.storePath, fileType);
-
-          // create the files
-          FileFactory.createNewFile(csvFilePath, fileType);
-        }
-
-        outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
-
-        bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
-            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      }
-      bufferedCSVWriter.write(logStrings.toString());
-      bufferedCSVWriter.newLine();
-    } catch (FileNotFoundException e) {
-      LOGGER.error("Bad record csv Files not found");
-      throw new CarbonDataLoadingException("Bad record csv Files not found", e);
-    } catch (IOException e) {
-      LOGGER.error("Error While writing bad record csv File");
-      throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
-    }
-    finally {
-      badRecordEntry.put(taskKey, "Partially");
-    }
-  }
-
-  public boolean isBadRecordConvertNullDisable() {
-    return badRecordConvertNullDisable;
-  }
-
-  public boolean isDataLoadFail() {
-    return isDataLoadFail;
-  }
-
-  public boolean isBadRecordLoggerEnable() {
-    return badRecordLoggerEnable;
-  }
-
-  public boolean isBadRecordsLogRedirect() {
-    return badRecordsLogRedirect;
-  }
-
-  /**
-   * closeStreams void
-   */
-  public synchronized void closeStreams() {
-    CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index fabb5a5..79e49ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -52,10 +52,10 @@ import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
 import org.apache.carbondata.processing.datatypes.StructDataType;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
new file mode 100644
index 0000000..8681269
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.util;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.datastore.row.LoadStatusType;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.merger.NodeBlockRelation;
+import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+
+public final class CarbonLoaderUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+
+  private CarbonLoaderUtil() {
+  }
+
+  public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
+      deleteStorePath(segmentPath);
+    }
+  }
+
+  /**
+   * the method returns true if the segment has carbondata file else returns false.
+   *
+   * @param loadModel
+   * @param currentLoad
+   * @return
+   */
+  public static boolean isValidSegment(CarbonLoadModel loadModel,
+      int currentLoad) {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
+        .getCarbonTable();
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+        loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    int fileCount = 0;
+    int partitionCount = carbonTable.getPartitionCount();
+    for (int i = 0; i < partitionCount; i++) {
+      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+          currentLoad + "");
+      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+          FileFactory.getFileType(segmentPath));
+      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+        @Override
+        public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(
+              CarbonTablePath.getCarbonIndexExtension())
+              || file.getName().endsWith(
+              CarbonTablePath.getCarbonDataExtension());
+        }
+
+      });
+      fileCount += files.length;
+      if (files.length > 0) {
+        return true;
+      }
+    }
+    if (fileCount == 0) {
+      return false;
+    }
+    return true;
+  }
+  public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
+      final boolean isCompactionFlow) throws IOException {
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+    //delete folder which metadata no exist in tablestatus
+    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+      final String partitionCount = i + "";
+      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+      FileType fileType = FileFactory.getFileType(partitionPath);
+      if (FileFactory.isFileExist(partitionPath, fileType)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+          @Override public boolean accept(CarbonFile path) {
+            String segmentId =
+                CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+            boolean found = false;
+            for (int j = 0; j < details.length; j++) {
+              if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+                  .equals(partitionCount)) {
+                found = true;
+                break;
+              }
+            }
+            return !found;
+          }
+        });
+        for (int k = 0; k < listFiles.length; k++) {
+          String segmentId =
+              CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+          if (isCompactionFlow) {
+            if (segmentId.contains(".")) {
+              deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          } else {
+            if (!segmentId.contains(".")) {
+              deleteStorePath(listFiles[k].getAbsolutePath());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static void deleteStorePath(String path) {
+    try {
+      FileType fileType = FileFactory.getFileType(path);
+      if (FileFactory.isFileExist(path, fileType)) {
+        CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+        CarbonUtil.deleteFoldersAndFiles(carbonFile);
+      }
+    } catch (IOException | InterruptedException e) {
+      LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+    }
+  }
+
+
+  /**
+   * This method will delete the local data load folder location after data load is complete
+   *
+   * @param loadModel
+   */
+  public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+      boolean isCompactionFlow, boolean isAltPartitionFlow) {
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = CarbonDataProcessorUtil
+        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+            loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
+    // form local store location
+    final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
+    if (localStoreLocations == null) {
+      throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+    }
+    // submit local folder clean up in another thread so that main thread execution is not blocked
+    ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
+    try {
+      localFolderDeletionService.submit(new Callable<Void>() {
+        @Override public Void call() throws Exception {
+          long startTime = System.currentTimeMillis();
+          String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
+          for (String loc : locArray) {
+            try {
+              CarbonUtil.deleteFoldersAndFiles(new File(loc));
+            } catch (IOException | InterruptedException e) {
+              LOGGER.error(e,
+                  "Failed to delete local data load folder location: " + loc);
+            }
+          }
+          LOGGER.info("Deleted the local store location: " + localStoreLocations
+                + " : Time taken: " + (System.currentTimeMillis() - startTime));
+          return null;
+        }
+      });
+    } finally {
+      if (null != localFolderDeletionService) {
+        localFolderDeletionService.shutdown();
+      }
+    }
+
+  }
+
+  /**
+   * This API will write the load level metadata for the loadmanagement module inorder to
+   * manage the load and query execution management smoothly.
+   *
+   * @param newMetaEntry
+   * @param loadModel
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
+      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
+      throws IOException, InterruptedException {
+    boolean status = false;
+    String metaDataFilepath =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+                + " for table status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+        List<LoadMetadataDetails> listOfLoadFolderDetails =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        List<CarbonFile> staleFolders = new ArrayList<>();
+        Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
+        // create a new segment Id if load has just begun else add the already generated Id
+        if (loadStartEntry) {
+          String segmentId =
+              String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
+          newMetaEntry.setLoadName(segmentId);
+          loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
+          loadModel.setSegmentId(segmentId);
+          // Exception should be thrown if:
+          // 1. If insert overwrite is in progress and any other load or insert operation
+          // is triggered
+          // 2. If load or insert into operation is in progress and insert overwrite operation
+          // is triggered
+          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+            if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
+              throw new RuntimeException("Already insert overwrite is in progress");
+            } else if (
+                newMetaEntry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())
+                    && entry.getLoadStatus().equals(LoadStatusType.IN_PROGRESS.getMessage())) {
+              throw new RuntimeException("Already insert into or load is in progress");
+            }
+          }
+          listOfLoadFolderDetails.add(newMetaEntry);
+        } else {
+          newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
+          // existing entry needs to be overwritten as the entry will exist with some
+          // intermediate status
+          int indexToOverwriteNewMetaEntry = 0;
+          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+            if (entry.getLoadName().equals(newMetaEntry.getLoadName())
+                && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+              break;
+            }
+            indexToOverwriteNewMetaEntry++;
+          }
+          if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getLoadStatus()
+              .equals(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+            throw new RuntimeException("It seems insert overwrite has been issued during load");
+          }
+          if (insertOverwrite) {
+            for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+              if (!entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
+                entry.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+                // For insert overwrite, we will delete the old segment folder immediately
+                // So collect the old segments here
+                String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
+                // add to the deletion list only if file exist else HDFS file system will throw
+                // exception while deleting the file if file path does not exist
+                if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+                  staleFolders.add(FileFactory.getCarbonFile(path));
+                }
+              }
+            }
+          }
+          listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
+        }
+        SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+            .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+        // Delete all old stale segment folders
+        for (CarbonFile staleFolder : staleFolders) {
+          // try block is inside for loop because even if there is failure in deletion of 1 stale
+          // folder still remaining stale folders should be deleted
+          try {
+            CarbonUtil.deleteFoldersAndFiles(staleFolder);
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error("Failed to delete stale folder: " + e.getMessage());
+          }
+        }
+        status = true;
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+            .getDatabaseName() + "." + loadModel.getTableName());
+      };
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info(
+            "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+                + "." + loadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
+                .getTableName() + " during table status updation");
+      }
+    }
+    return status;
+  }
+
+  /**
+   * Method to create new entry for load in table status file
+   *
+   * @param loadMetadataDetails
+   * @param loadStatus
+   * @param loadStartTime
+   * @param addLoadEndTime
+   */
+  public static void populateNewLoadMetaEntry(LoadMetadataDetails loadMetadataDetails,
+      String loadStatus, long loadStartTime, boolean addLoadEndTime) {
+    if (addLoadEndTime) {
+      long loadEndDate = CarbonUpdateUtil.readCurrentTime();
+      loadMetadataDetails.setLoadEndTime(loadEndDate);
+    }
+    loadMetadataDetails.setLoadStatus(loadStatus);
+    loadMetadataDetails.setLoadStartTime(loadStartTime);
+  }
+
+  public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
+    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+
+    DataOutputStream dataOutputStream;
+    Gson gsonObjectToWrite = new Gson();
+    BufferedWriter brWriter = null;
+
+    AtomicFileOperations writeOperation =
+        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+    try {
+
+      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+      brWriter.write(metadataInstance);
+    } finally {
+      try {
+        if (null != brWriter) {
+          brWriter.flush();
+        }
+      } catch (Exception e) {
+        LOGGER.error("error in  flushing ");
+
+      }
+      CarbonUtil.closeStreams(brWriter);
+      writeOperation.close();
+    }
+
+  }
+
+  public static String readCurrentTime() {
+    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+    String date = null;
+
+    date = sdf.format(new Date());
+
+    return date;
+  }
+
+  public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
+      String carbonStorePath) throws IOException {
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
+        CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+    return dictCache.get(columnIdentifier);
+  }
+
+  public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
+      ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+      throws IOException {
+    return getDictionary(
+        new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType,
+            CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)),
+        carbonStorePath);
+  }
+
+  /**
+   * This method will divide the blocks among the tasks of the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @param noOfNodesInput -1 if number of nodes has to be decided
+   *                       based on block location information
+   * @param parallelism    total no of tasks to execute in parallel
+   * @return
+   */
+  public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
+      List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
+      List<String> activeNode) {
+
+    Map<String, List<Distributable>> mapOfNodes =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+    int taskPerNode = parallelism / mapOfNodes.size();
+    //assigning non zero value to noOfTasksPerNode
+    int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
+    // divide the blocks of a node among the tasks of the node.
+    return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+      int noOfNodesInput) {
+    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
+    // -1 if number of nodes has to be decided based on block location information
+    return nodeBlockMapping(blockInfos, -1);
+  }
+
+  /**
+   * the method returns the number of required executors
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> getRequiredExecutors(
+      List<Distributable> blockInfos) {
+    List<NodeBlockRelation> flattenedList =
+        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (Distributable blockInfo : blockInfos) {
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+      }
+    }
+    // sort the flattened data.
+    Collections.sort(flattenedList);
+    Map<String, List<Distributable>> nodeAndBlockMapping =
+        new LinkedHashMap<String, List<Distributable>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // from the flattened list create a mapping of node vs Data blocks.
+    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+    return nodeAndBlockMapping;
+  }
+
+  /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @param noOfNodesInput -1 if number of nodes has to be decided
+   *                       based on block location information
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+      int noOfNodesInput, List<String> activeNodes) {
+
+    Map<String, List<Distributable>> nodeBlocksMap =
+        new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<NodeBlockRelation> flattenedList =
+        new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    Set<Distributable> uniqueBlocks =
+        new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+
+    int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
+    if (null != activeNodes) {
+      noofNodes = activeNodes.size();
+    }
+    int blocksPerNode = blockInfos.size() / noofNodes;
+    blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
+
+    // sort the flattened data.
+    Collections.sort(flattenedList);
+
+    Map<String, List<Distributable>> nodeAndBlockMapping =
+        new LinkedHashMap<String, List<Distributable>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // from the flattened list create a mapping of node vs Data blocks.
+    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+
+    // so now we have a map of node vs blocks. allocate the block as per the order
+    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
+
+    // if any blocks remain then assign them to nodes in round robin.
+    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
+
+    return nodeBlocksMap;
+  }
+
+  /**
+   * Assigning the blocks of a node to tasks.
+   *
+   * @param nodeBlocksMap nodeName to list of blocks mapping
+   * @param noOfTasksPerNode
+   * @return
+   */
+  private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
+      Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
+    Map<String, List<List<Distributable>>> outputMap =
+        new HashMap<String, List<List<Distributable>>>(
+            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // for each node
+    for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
+
+      List<Distributable> blockOfEachNode = eachNode.getValue();
+      //sorting the block so same block will be give to same executor
+      Collections.sort(blockOfEachNode);
+      // create the task list for each node.
+      createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
+
+      // take all the block of node and divide it among the tasks of a node.
+      divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
+    }
+
+    return outputMap;
+  }
+
+  /**
+   * This will divide the blocks of a node to tasks of the node.
+   *
+   * @param outputMap
+   * @param key
+   * @param blockOfEachNode
+   */
+  private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
+      String key, List<Distributable> blockOfEachNode) {
+
+    List<List<Distributable>> taskLists = outputMap.get(key);
+    int tasksOfNode = taskLists.size();
+    int i = 0;
+    for (Distributable block : blockOfEachNode) {
+
+      taskLists.get(i % tasksOfNode).add(block);
+      i++;
+    }
+
+  }
+
+  /**
+   * This will create the empty list for each task of a node.
+   *
+   * @param outputMap
+   * @param noOfTasksPerNode
+   * @param key
+   */
+  private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
+      int noOfTasksPerNode, String key) {
+    List<List<Distributable>> nodeTaskList =
+        new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (int i = 0; i < noOfTasksPerNode; i++) {
+      List<Distributable> eachTask =
+          new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      nodeTaskList.add(eachTask);
+
+    }
+    outputMap.put(key, nodeTaskList);
+
+  }
+
+  /**
+   * If any left over data blocks are present then assign those to nodes in round robin way.
+   *
+   * @param outputMap
+   * @param uniqueBlocks
+   */
+  private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
+      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+    if (activeNodes != null) {
+      for (String activeNode : activeNodes) {
+        List<Distributable> blockLst = outputMap.get(activeNode);
+        if (null == blockLst) {
+          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        }
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+        if (blockLst.size() > 0) {
+          outputMap.put(activeNode, blockLst);
+        }
+      }
+    } else {
+      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+        List<Distributable> blockLst = entry.getValue();
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      }
+
+    }
+
+    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+      Iterator<Distributable> blocks = uniqueBlocks.iterator();
+      if (blocks.hasNext()) {
+        Distributable block = blocks.next();
+        List<Distributable> blockLst = entry.getValue();
+        blockLst.add(block);
+        blocks.remove();
+      }
+    }
+  }
+
+  /**
+   * The method populate the blockLst to be allocate to a specific node.
+   * @param uniqueBlocks
+   * @param noOfBlocksPerNode
+   * @param blockLst
+   */
+  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+      List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = uniqueBlocks.iterator();
+    //if the node is already having the per block nodes then avoid assign the extra blocks
+    if (blockLst.size() == noOfBlocksPerNode) {
+      return;
+    }
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      blockLst.add(block);
+      blocks.remove();
+      if (blockLst.size() >= noOfBlocksPerNode) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * To create the final output of the Node and Data blocks
+   *
+   * @param outputMap
+   * @param blocksPerNode
+   * @param uniqueBlocks
+   * @param nodeAndBlockMapping
+   * @param activeNodes
+   */
+  private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
+      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
+      List<String> activeNodes) {
+
+    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
+        new ArrayList<>(nodeAndBlockMapping.size());
+    for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
+      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
+    }
+    // sort nodes based on number of blocks per node, so that nodes having lesser blocks
+    // are assigned first
+    Collections.sort(multiBlockRelations);
+
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+      String nodeName = nodeMultiBlockRelation.getNode();
+      //assign the block to the node only if the node is active
+      String activeExecutor = nodeName;
+      if (null != activeNodes) {
+        activeExecutor = getActiveExecutor(activeNodes, nodeName);
+        if (null == activeExecutor) {
+          continue;
+        }
+      }
+      // this loop will be for each NODE
+      int nodeCapacity = 0;
+      // loop thru blocks of each Node
+      for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+
+        // check if this is already assigned.
+        if (uniqueBlocks.contains(block)) {
+
+          if (null == outputMap.get(activeExecutor)) {
+            List<Distributable> list =
+                new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+            outputMap.put(activeExecutor, list);
+          }
+          // assign this block to this node if node has capacity left
+          if (nodeCapacity < blocksPerNode) {
+            List<Distributable> infos = outputMap.get(activeExecutor);
+            infos.add(block);
+            nodeCapacity++;
+            uniqueBlocks.remove(block);
+          } else {
+            // No need to continue loop as node is full
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * method validates whether the node is active or not.
+   *
+   * @param activeNode
+   * @param nodeName
+   * @return returns true if active else false.
+   */
+  private static String getActiveExecutor(List activeNode, String nodeName) {
+    boolean isActiveNode = activeNode.contains(nodeName);
+    if (isActiveNode) {
+      return nodeName;
+    }
+    //if localhost then retrieve the localhost name then do the check
+    else if (nodeName.equals("localhost")) {
+      try {
+        String hostName = InetAddress.getLocalHost().getHostName();
+        isActiveNode = activeNode.contains(hostName);
+        if (isActiveNode) {
+          return hostName;
+        }
+      } catch (UnknownHostException ue) {
+        isActiveNode = false;
+      }
+    } else {
+      try {
+        String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
+        isActiveNode = activeNode.contains(hostAddress);
+        if (isActiveNode) {
+          return hostAddress;
+        }
+      } catch (UnknownHostException ue) {
+        isActiveNode = false;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Create the Node and its related blocks Mapping and put in a Map
+   *
+   * @param flattenedList
+   * @param nodeAndBlockMapping
+   */
+  private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
+      Map<String, List<Distributable>> nodeAndBlockMapping) {
+    for (NodeBlockRelation nbr : flattenedList) {
+      String node = nbr.getNode();
+      List<Distributable> list;
+
+      if (null == nodeAndBlockMapping.get(node)) {
+        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        list.add(nbr.getBlock());
+        nodeAndBlockMapping.put(node, list);
+      } else {
+        list = nodeAndBlockMapping.get(node);
+        list.add(nbr.getBlock());
+      }
+    }
+    /*for resolving performance issue, removed values() with entrySet () iterating the values and
+    sorting it.entrySet will give the logical view for hashMap and we dont query the map twice for
+    each key whereas values () iterate twice*/
+    Iterator<Map.Entry<String, List<Distributable>>> iterator =
+        nodeAndBlockMapping.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Collections.sort(iterator.next().getValue());
+    }
+  }
+
+  /**
+   * Create the flat List i.e flattening of the Map.
+   *
+   * @param blockInfos
+   * @param flattenedList
+   * @param uniqueBlocks
+   */
+  private static void createFlattenedListFromMap(List<Distributable> blockInfos,
+      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
+      Set<String> nodeList) {
+    for (Distributable blockInfo : blockInfos) {
+      // put the blocks in the set
+      uniqueBlocks.add(blockInfo);
+
+      try {
+        for (String eachNode : blockInfo.getLocations()) {
+          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+          flattenedList.add(nbr);
+          nodeList.add(eachNode);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+      }
+    }
+  }
+
+  /**
+   * This method will get the store location for the given path, segment id and partition id
+   *
+   * @param carbonStorePath
+   * @param segmentId
+   */
+  public static void checkAndCreateCarbonDataLocation(String carbonStorePath,
+      String segmentId, CarbonTable carbonTable) {
+    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+    String carbonDataDirectoryPath =
+        carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+  }
+
+  /**
+   * This will update the old table status details before clean files to the latest table status.
+   * @param oldList
+   * @param newList
+   * @return
+   */
+  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+    List<LoadMetadataDetails> newListMetadata =
+        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+    for (LoadMetadataDetails oldSegment : oldList) {
+      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+      }
+    }
+    return newListMetadata;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
new file mode 100644
index 0000000..ec91472
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.processing.partition.Partition;
+import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
+import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.processing.partition.impl.QueryPartitionHelper;
+import org.apache.carbondata.processing.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public class CarbonQueryUtil {
+
+  private CarbonQueryUtil() {
+
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
+      CarbonQueryPlan queryPlan) {
+
+    //Just create splits depends on locations of region servers
+    List<Partition> allPartitions = null;
+    if (queryPlan == null) {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
+    } else {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+    }
+    TableSplit[] splits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < splits.length; i++) {
+      splits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = QueryPartitionHelper.getInstance()
+          .getLocation(partition, databaseName, tableName);
+      locations.add(location);
+      splits[i].setPartition(partition);
+      splits[i].setLocations(locations);
+    }
+
+    return splits;
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
+
+    //Just create splits depends on locations of region servers
+    DefaultLoadBalancer loadBalancer = null;
+    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
+    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < tblSplits.length; i++) {
+      tblSplits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = loadBalancer.getNodeForPartitions(partition);
+      locations.add(location);
+      tblSplits[i].setPartition(partition);
+      tblSplits[i].setLocations(locations);
+    }
+    return tblSplits;
+  }
+
+  /**
+   * split sourcePath by comma
+   */
+  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
+      String separator) {
+    if (StringUtils.isNotEmpty(sourcePath)) {
+      String[] files = sourcePath.split(separator);
+      Collections.addAll(partitionsFiles, files);
+    }
+  }
+
+  private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
+    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+    List<Partition> partitionList =
+        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
+
+    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
+
+    for (int i = 0; i < files.size(); i++) {
+      partitionFiles.get(0).add(files.get(i));
+    }
+    return partitionList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
new file mode 100644
index 0000000..d668cc2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public final class DeleteLoadFolders {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+  private DeleteLoadFolders() {
+
+  }
+
+  /**
+   * returns segment path
+   *
+   * @param dbName
+   * @param tableName
+   * @param storeLocation
+   * @param partitionId
+   * @param oneLoad
+   * @return
+   */
+  private static String getSegmentPath(String dbName, String tableName, String storeLocation,
+      int partitionId, LoadMetadataDetails oneLoad) {
+    CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
+        new CarbonTableIdentifier(dbName, tableName, ""));
+    String segmentId = oneLoad.getLoadName();
+    return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
+  }
+
+  private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
+
+    boolean status = false;
+    try {
+      if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+        CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+        CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+          @Override public boolean accept(CarbonFile file) {
+            return (CarbonTablePath.isCarbonDataFile(file.getName())
+                || CarbonTablePath.isCarbonIndexFile(file.getName()));
+          }
+        });
+
+        //if there are no fact and msr metadata files present then no need to keep
+        //entry in metadata.
+        if (filesToBeDeleted.length == 0) {
+          status = true;
+        } else {
+
+          for (CarbonFile eachFile : filesToBeDeleted) {
+            if (!eachFile.delete()) {
+              LOGGER.warn("Unable to delete the file as per delete command "
+                  + eachFile.getAbsolutePath());
+              status = false;
+            } else {
+              status = true;
+            }
+          }
+        }
+        // need to delete the complete folder.
+        if (status) {
+          if (!file.delete()) {
+            LOGGER.warn("Unable to delete the folder as per delete command "
+                + file.getAbsolutePath());
+            status = false;
+          }
+        }
+
+      } else {
+        status = false;
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Unable to delete the file as per delete command " + path);
+    }
+
+    return status;
+
+  }
+
+  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+      boolean isForceDelete) {
+    if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
+        || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
+        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+      if (isForceDelete) {
+        return true;
+      }
+      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+    }
+
+    return false;
+  }
+
+  public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
+      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
+
+    boolean isDeleted = false;
+
+    if (details != null && details.length != 0) {
+      for (LoadMetadataDetails oneLoad : details) {
+        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+          String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
+          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
+          if (deletionStatus) {
+            isDeleted = true;
+            oneLoad.setVisibility("false");
+            LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+          }
+        }
+      }
+    }
+
+    return isDeleted;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
new file mode 100644
index 0000000..415eb8d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+
+/**
+ * Utility for load data
+ */
+public final class LoadMetadataUtil {
+  private LoadMetadataUtil() {
+
+  }
+
+  public static boolean isLoadDeletionRequired(String metaDataLocation) {
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    if (details != null && details.length != 0) {
+      for (LoadMetadataDetails oneRow : details) {
+        if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
+            || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
+            && oneRow.getVisibility().equalsIgnoreCase("true")) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java b/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
new file mode 100644
index 0000000..fa910e6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+/**
+ * enum holds the value related to the ddl option
+ */
+public enum TableOptionConstant {
+  SERIALIZATION_NULL_FORMAT("serialization_null_format"),
+  BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"),
+  BAD_RECORDS_ACTION("bad_records_action");
+
+  private String name;
+
+  /**
+   * constructor to initialize the enum value
+   * @param name
+   */
+  TableOptionConstant(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 3bb186e..61771ea 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -72,16 +72,15 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.constants.TableOptionConstant;
-import org.apache.carbondata.processing.csvload.BlockDetails;
-import org.apache.carbondata.processing.csvload.CSVInputFormat;
-import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.csvload.StringArrayWritable;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.DataLoadExecutor;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -384,7 +383,6 @@ public class StoreCreator {
       path.delete();
     }
 
-    SchemaInfo info = new SchemaInfo();
     BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
         0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
     Configuration configuration = new Configuration();
@@ -411,9 +409,6 @@ public class StoreCreator {
         storeLocationArray,
         new CarbonIterator[]{readerIterator});
 
-    info.setDatabaseName(databaseName);
-    info.setTableName(tableName);
-
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
deleted file mode 100644
index 676838d..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class CSVInputFormatTest extends TestCase {
-
-  /**
-   * generate compressed files, no need to call this method.
-   * @throws Exception
-   */
-  public void generateCompressFiles() throws Exception {
-    String pwd = new File("src/test/resources/csv").getCanonicalPath();
-    String inputFile = pwd + "/data.csv";
-    FileInputStream input = new FileInputStream(inputFile);
-    Configuration conf = new Configuration();
-
-    // .gz
-    String outputFile = pwd + "/data.csv.gz";
-    FileOutputStream output = new FileOutputStream(outputFile);
-    GzipCodec gzip = new GzipCodec();
-    gzip.setConf(conf);
-    CompressionOutputStream outputStream = gzip.createOutputStream(output);
-    int i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    // .bz2
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.bz2";
-    output = new FileOutputStream(outputFile);
-    BZip2Codec bzip2 = new BZip2Codec();
-    bzip2.setConf(conf);
-    outputStream = bzip2.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    // .snappy
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.snappy";
-    output = new FileOutputStream(outputFile);
-    SnappyCodec snappy = new SnappyCodec();
-    snappy.setConf(conf);
-    outputStream = snappy.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    //.lz4
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.lz4";
-    output = new FileOutputStream(outputFile);
-    Lz4Codec lz4 = new Lz4Codec();
-    lz4.setConf(conf);
-    outputStream = lz4.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-  }
-
-  /**
-   * CSVCheckMapper check the content of csv files.
-   */
-  public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
-      NullWritable> {
-    @Override
-    protected void map(NullWritable key, StringArrayWritable value, Context context)
-        throws IOException, InterruptedException {
-      String[] columns = value.get();
-      int id = Integer.parseInt(columns[0]);
-      int salary = Integer.parseInt(columns[6]);
-      Assert.assertEquals(id - 1, salary - 15000);
-    }
-  }
-
-  /**
-   * test read csv files
-   * @throws Exception
-   */
-  @Test public void testReadCSVFiles() throws Exception{
-    Configuration conf = new Configuration();
-    prepareConf(conf);
-    Job job = Job.getInstance(conf, "CSVInputFormat_normal");
-    job.setJarByClass(CSVInputFormatTest.class);
-    job.setMapperClass(CSVCheckMapper.class);
-    job.setNumReduceTasks(0);
-    job.setInputFormatClass(CSVInputFormat.class);
-
-    String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
-    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
-    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
-
-    File output = new File("target/output_CSVInputFormatTest");
-    deleteOutput(output);
-    FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
-
-    Assert.assertTrue(job.waitForCompletion(true));
-  }
-
-  private void prepareConf(Configuration conf) {
-    conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
-    conf.set(CSVInputFormat.MAX_COLUMNS, "10");
-    conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7");
-  }
-
-  private void deleteOutput(File output) {
-    if (output.exists()) {
-      if (output.isDirectory()) {
-        for(File file : output.listFiles()) {
-          deleteOutput(file);
-        }
-        output.delete();
-      } else {
-        output.delete();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
new file mode 100644
index 0000000..925701d
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CSVInputFormatTest extends TestCase {
+
+  /**
+   * generate compressed files, no need to call this method.
+   * @throws Exception
+   */
+  public void generateCompressFiles() throws Exception {
+    String pwd = new File("src/test/resources/csv").getCanonicalPath();
+    String inputFile = pwd + "/data.csv";
+    FileInputStream input = new FileInputStream(inputFile);
+    Configuration conf = new Configuration();
+
+    // .gz
+    String outputFile = pwd + "/data.csv.gz";
+    FileOutputStream output = new FileOutputStream(outputFile);
+    GzipCodec gzip = new GzipCodec();
+    gzip.setConf(conf);
+    CompressionOutputStream outputStream = gzip.createOutputStream(output);
+    int i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    // .bz2
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.bz2";
+    output = new FileOutputStream(outputFile);
+    BZip2Codec bzip2 = new BZip2Codec();
+    bzip2.setConf(conf);
+    outputStream = bzip2.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    // .snappy
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.snappy";
+    output = new FileOutputStream(outputFile);
+    SnappyCodec snappy = new SnappyCodec();
+    snappy.setConf(conf);
+    outputStream = snappy.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    //.lz4
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.lz4";
+    output = new FileOutputStream(outputFile);
+    Lz4Codec lz4 = new Lz4Codec();
+    lz4.setConf(conf);
+    outputStream = lz4.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+  }
+
+  /**
+   * CSVCheckMapper check the content of csv files.
+   */
+  public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
+      NullWritable> {
+    @Override
+    protected void map(NullWritable key, StringArrayWritable value, Context context)
+        throws IOException, InterruptedException {
+      String[] columns = value.get();
+      int id = Integer.parseInt(columns[0]);
+      int salary = Integer.parseInt(columns[6]);
+      Assert.assertEquals(id - 1, salary - 15000);
+    }
+  }
+
+  /**
+   * test read csv files
+   * @throws Exception
+   */
+  @Test public void testReadCSVFiles() throws Exception{
+    Configuration conf = new Configuration();
+    prepareConf(conf);
+    Job job = Job.getInstance(conf, "CSVInputFormat_normal");
+    job.setJarByClass(CSVInputFormatTest.class);
+    job.setMapperClass(CSVCheckMapper.class);
+    job.setNumReduceTasks(0);
+    job.setInputFormatClass(CSVInputFormat.class);
+
+    String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
+    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
+    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
+
+    File output = new File("target/output_CSVInputFormatTest");
+    deleteOutput(output);
+    FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
+
+    Assert.assertTrue(job.waitForCompletion(true));
+  }
+
+  private void prepareConf(Configuration conf) {
+    conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
+    conf.set(CSVInputFormat.MAX_COLUMNS, "10");
+    conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7");
+  }
+
+  private void deleteOutput(File output) {
+    if (output.exists()) {
+      if (output.isDirectory()) {
+        for(File file : output.listFiles()) {
+          deleteOutput(file);
+        }
+        output.delete();
+      } else {
+        output.delete();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
new file mode 100644
index 0000000..88ff377
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.dictionary;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InMemBiDictionaryTest {
+
+  /**
+   * test pre-created dictionary
+   */
+  @Test public void testPreCreated() throws Exception {
+    Map<Integer, String> map = new HashMap<>();
+    map.put(1, "amy");
+    map.put(2, "bob");
+    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(map);
+    Assert.assertEquals(1, dict.getKey("amy").intValue());
+    Assert.assertEquals(2, dict.getKey("bob").intValue());
+    Assert.assertEquals("amy", dict.getValue(1));
+    Assert.assertEquals("bob", dict.getValue(2));
+    Assert.assertEquals(2, dict.size());
+    try {
+      dict.getOrGenerateKey("cat");
+      Assert.fail("add dictionary successfully");
+    } catch (Exception e) {
+      // test pass
+    }
+  }
+
+  /**
+   * test generating dictionary on the fly
+   */
+  @Test public void testGenerateDict() throws Exception {
+    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(
+        new DictionaryGenerator<Integer, String>() {
+          int sequence = 1;
+          @Override
+          public Integer generateKey(String value) throws DictionaryGenerationException {
+            return sequence++;
+          }
+        });
+    Assert.assertEquals(1, dict.getOrGenerateKey("amy").intValue());
+    Assert.assertEquals(2, dict.getOrGenerateKey("bob").intValue());
+    Assert.assertEquals(1, dict.getKey("amy").intValue());
+    Assert.assertEquals(2, dict.getKey("bob").intValue());
+    Assert.assertEquals("amy", dict.getValue(1));
+    Assert.assertEquals("bob", dict.getValue(2));
+    Assert.assertEquals(2, dict.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
deleted file mode 100644
index 6d82cce..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class InMemBiDictionaryTest {
-
-  /**
-   * test pre-created dictionary
-   */
-  @Test public void testPreCreated() throws Exception {
-    Map<Integer, String> map = new HashMap<>();
-    map.put(1, "amy");
-    map.put(2, "bob");
-    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(map);
-    Assert.assertEquals(1, dict.getKey("amy").intValue());
-    Assert.assertEquals(2, dict.getKey("bob").intValue());
-    Assert.assertEquals("amy", dict.getValue(1));
-    Assert.assertEquals("bob", dict.getValue(2));
-    Assert.assertEquals(2, dict.size());
-    try {
-      dict.getOrGenerateKey("cat");
-      Assert.fail("add dictionary successfully");
-    } catch (Exception e) {
-      // test pass
-    }
-  }
-
-  /**
-   * test generating dictionary on the fly
-   */
-  @Test public void testGenerateDict() throws Exception {
-    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(
-        new DictionaryGenerator<Integer, String>() {
-          int sequence = 1;
-          @Override
-          public Integer generateKey(String value) throws DictionaryGenerationException {
-            return sequence++;
-          }
-        });
-    Assert.assertEquals(1, dict.getOrGenerateKey("amy").intValue());
-    Assert.assertEquals(2, dict.getOrGenerateKey("bob").intValue());
-    Assert.assertEquals(1, dict.getKey("amy").intValue());
-    Assert.assertEquals(2, dict.getKey("bob").intValue());
-    Assert.assertEquals("amy", dict.getValue(1));
-    Assert.assertEquals("bob", dict.getValue(2));
-    Assert.assertEquals(2, dict.size());
-  }
-}