You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:27 UTC

[40/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
deleted file mode 100644
index 25c8071..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-
-public final class QueryPartitionHelper {
-  private static QueryPartitionHelper instance = new QueryPartitionHelper();
-  private Map<String, DataPartitioner> partitionerMap =
-      new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  private Map<String, DefaultLoadBalancer> loadBalancerMap =
-      new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private QueryPartitionHelper() {
-
-  }
-
-  public static QueryPartitionHelper getInstance() {
-    return instance;
-  }
-
-  /**
-   * Get partitions applicable for query based on filters applied in query
-   */
-  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
-    String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    return dataPartitioner.getPartitions();
-  }
-
-  public List<Partition> getAllPartitions(String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    return dataPartitioner.getAllPartitions();
-  }
-
-  /**
-   * Get the node name where the partition is assigned to.
-   */
-  public String getLocation(Partition partition, String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
-
-    DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
-    return loadBalancer.getNodeForPartitions(partition);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
deleted file mode 100644
index 0762799..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-/**
- * Sample partition.
- */
-public class SampleDataPartitionerImpl implements DataPartitioner {
-
-  @Override
-  public List<Partition> getAllPartitions() {
-    return null;
-  }
-
-  @Override
-  public List<Partition> getPartitions() {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
deleted file mode 100644
index 733aae9..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.splits;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-import org.apache.hadoop.io.Writable;
-
-
-/**
- * It represents one region server as one split.
- */
-public class TableSplit implements Serializable, Writable {
-  private static final long serialVersionUID = -8058151330863145575L;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(TableSplit.class.getName());
-  private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-  private Partition partition;
-
-  /**
-   * @return the locations
-   */
-  public List<String> getLocations() {
-    return locations;
-  }
-
-  /**
-   * @param locations the locations to set
-   */
-  public void setLocations(List<String> locations) {
-    this.locations = locations;
-  }
-
-  /**
-   * @return Returns the partitions.
-   */
-  public Partition getPartition() {
-    return partition;
-  }
-
-  /**
-   * @param partition The partitions to set.
-   */
-  public void setPartition(Partition partition) {
-    this.partition = partition;
-  }
-
-  @Override public void readFields(DataInput in) throws IOException {
-
-    int sizeLoc = in.readInt();
-    for (int i = 0; i < sizeLoc; i++) {
-      byte[] b = new byte[in.readInt()];
-      in.readFully(b);
-      locations.add(new String(b, Charset.defaultCharset()));
-    }
-
-    byte[] buf = new byte[in.readInt()];
-    in.readFully(buf);
-    ByteArrayInputStream bis = new ByteArrayInputStream(buf);
-    ObjectInputStream ois = new ObjectInputStream(bis);
-    try {
-      partition = (Partition) ois.readObject();
-    } catch (ClassNotFoundException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    ois.close();
-  }
-
-  @Override public void write(DataOutput out) throws IOException {
-
-    int sizeLoc = locations.size();
-    out.writeInt(sizeLoc);
-    for (int i = 0; i < sizeLoc; i++) {
-      byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
-      out.writeInt(bytes.length);
-      out.write(bytes);
-    }
-
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
-    ObjectOutputStream obs = new ObjectOutputStream(bos);
-    obs.writeObject(partition);
-    obs.close();
-    byte[] byteArray = bos.toByteArray();
-    out.writeInt(byteArray.length);
-    out.write(byteArray);
-  }
-
-  public String toString() {
-    return partition.getUniqueID() + ' ' + locations;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index 1e39edf..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public class CarbonQueryUtil {
-
-  private CarbonQueryUtil() {
-
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) {
-
-    //Just create splits depends on locations of region servers
-    List<Partition> allPartitions = null;
-    if (queryPlan == null) {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
-    } else {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
-    }
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = QueryPartitionHelper.getInstance()
-          .getLocation(partition, databaseName, tableName);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-
-    return splits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
-
-    //Just create splits depends on locations of region servers
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
-    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
-    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < tblSplits.length; i++) {
-      tblSplits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      tblSplits[i].setPartition(partition);
-      tblSplits[i].setLocations(locations);
-    }
-    return tblSplits;
-  }
-
-  /**
-   * split sourcePath by comma
-   */
-  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
-      String separator) {
-    if (StringUtils.isNotEmpty(sourcePath)) {
-      String[] files = sourcePath.split(separator);
-      Collections.addAll(partitionsFiles, files);
-    }
-  }
-
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
-    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(0).add(files.get(i));
-    }
-    return partitionList;
-  }
-
-  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
-    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    if (null != details) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
-          slices.add(loadName);
-        }
-      }
-    }
-    return slices;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
deleted file mode 100644
index 91a9556..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-
-/**
- * Utility for load data
- */
-public final class LoadMetadataUtil {
-  private LoadMetadataUtil() {
-
-  }
-
-  public static boolean isLoadDeletionRequired(String metaDataLocation) {
-    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneRow : details) {
-        if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
-            || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
-            && oneRow.getVisibility().equalsIgnoreCase("true")) {
-          return true;
-        }
-      }
-    }
-
-    return false;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
new file mode 100644
index 0000000..8c14cd3
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.util.Utils;
+
+public class Util {
+  /**
+   * return the Array of available local-dirs
+   */
+  public static String[] getConfiguredLocalDirs(SparkConf conf) {
+    return Utils.getConfiguredLocalDirs(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 6ad10f3..0bc9285 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -37,7 +37,7 @@ class CarbonOption(options: Map[String, String]) {
 
   def partitionClass: String = {
     options.getOrElse("partitionClass",
-      "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
+      "org.apache.carbondata.processing.partition.impl.SampleDataPartitionerImpl")
   }
 
   def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
index f7758a6..8cdccdc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
@@ -22,7 +22,7 @@ import org.apache.spark.Partitioner
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.scan.partition.{HashPartitioner => JavaHashPartitioner, ListPartitioner => JavaListPartitioner, RangePartitioner => JavaRangePartitioner}
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 
 object PartitionFactory {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index fed8a96..bdf9a71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -33,10 +33,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
+import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
+import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.util.CommonUtil
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 6943dcb..574fb8a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -27,17 +27,19 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.csvload.StringArrayWritable
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
-import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl
-import org.apache.carbondata.processing.newflow.sort.SortStepRowUtil
-import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl}
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
+import org.apache.carbondata.processing.loading.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl}
+import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.carbondata.spark.util.Util
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -215,7 +217,7 @@ object DataLoadProcessorStepOnSpark {
     val carbonUseLocalDir = CarbonProperties.getInstance()
       .getProperty("carbon.use.local.dir", "false")
     if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != storeLocations && storeLocations.nonEmpty) {
         storeLocation = storeLocations(Random.nextInt(storeLocations.length))
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
index 7880fee..a42680e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
@@ -21,8 +21,8 @@ import org.apache.spark.Accumulator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.BadRecordsLogger
 
 object GlobalSortHelper {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index a73b0df..8eb5101 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -21,8 +21,8 @@ import scala.collection.JavaConverters._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object ValidateUtil {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 6cf8a7a..5c6760a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -28,10 +28,10 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.spliter.RowResultProcessor
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.spark.util.Util
 
 class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     result: AlterPartitionResult[K, V],
@@ -78,7 +78,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
 
             if (carbonUseLocalDir.equalsIgnoreCase("true")) {
 
-                val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+                val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
                 if (null != storeLocations && storeLocations.nonEmpty) {
                     storeLocation = storeLocations(Random.nextInt(storeLocations.length))
                 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index b63fc48..32523d8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,8 +24,8 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
 
 class CarbonCleanFilesRDD[V: ClassTag](
     sc: SparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index da391cf..45271a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,8 +24,8 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.DeletedLoadResult
-import org.apache.carbondata.spark.util.CarbonQueryUtil
 
 class CarbonDeleteLoadByDateRDD[K, V](
     sc: SparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 9e43d0e..9a1ef33 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,8 +24,8 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
 
 class CarbonDeleteLoadRDD[V: ClassTag](
     sc: SparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index d1d49b9..652720c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,8 +22,8 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
+import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
 
 class CarbonDropTableRDD[V: ClassTag](
     sc: SparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index ca607e1..b2e0c47 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -34,8 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -44,9 +43,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index da0d082..16d0b80 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -32,8 +32,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.MergeResult
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c8a55af..1f88f25 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -45,13 +45,12 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUt
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.splits.TableSplit
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
 
 class CarbonMergerRDD[K, V](
     sc: SparkContext,
@@ -91,7 +90,7 @@ class CarbonMergerRDD[K, V](
 
       if (carbonUseLocalDir.equalsIgnoreCase("true")) {
 
-        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
         if (null != storeLocations && storeLocations.nonEmpty) {
           storeLocation = storeLocations(Random.nextInt(storeLocations.length))
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 86bc79f..d38be0a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.sql.hive.DistributionUtil
@@ -46,9 +46,8 @@ import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonCompactionUtil
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.spliter.CarbonSplitExecutor
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1c08307..fc34127 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,8 +41,8 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, TaskMetricsMap}
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index c2b7b74..c2029e5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -34,13 +34,13 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil}
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.compaction.CompactionCallable
-import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable}
-import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Common functions for data life cycle management

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 67c71a0..1d1b47a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -43,16 +43,14 @@ import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
-import org.apache.carbondata.processing.csvload.BlockDetails
-import org.apache.carbondata.processing.csvload.CSVInputFormat
-import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.splits.TableSplit
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, CarbonQueryUtil}
 import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load.{CarbonLoaderUtil, FailureCauses}
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
 
@@ -149,7 +147,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
 
     if (isCarbonUseLocalDir) {
-      val yarnStoreLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
 
       if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
         // use single dir

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index f45dc83..4d782c9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -26,9 +26,9 @@ import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**
  * Data load in case of update command .

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 5040e69..df25a37 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -50,12 +50,11 @@ import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.CSVInputFormat
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 601c0c7..c121960 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -51,14 +51,13 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
-import org.apache.carbondata.processing.csvload.CSVInputFormat
-import org.apache.carbondata.processing.csvload.StringArrayWritable
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.CarbonSparkFactory
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd._
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 42070c4..7d4dd49 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index fc20108..ebd1c6c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -39,12 +39,11 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
-import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD
-import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 case class TableModel(
     ifNotExistsSet: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index afcd970..37b722f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 object DistributionUtil {
   @transient

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
index 3060ff8..7345d54 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.exception.DataLoadingException
 
 object FileUtils {
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 002ed27..dcda72b 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.CommonUtil
 
 object PartitionUtils {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
index 643002d..9c37640 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
 
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails
 
 /*
  * this object use to handle file splits

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 596cebf..cf14a79 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, N
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.{FileUtils, SparkUtil}
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -53,18 +53,18 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException, NoRetryException}
-import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.splits.TableSplit
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
 import org.apache.carbondata.spark._
-import org.apache.carbondata.spark.load.{FailureCauses, _}
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -353,7 +353,7 @@ object CarbonDataRDDFactory {
           isCompactionTriggerByDDl
         )
         var storeLocation = ""
-        val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
         if (null != configuredStore && configuredStore.nonEmpty) {
           storeLocation = configuredStore(Random.nextInt(configuredStore.length))
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index c38f0e1..cb35960 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -43,9 +43,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
 import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.load.FailureCauses
 import org.apache.carbondata.spark.util.QueryPlanUtil
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 01cd113..9814cc2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -46,16 +46,16 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
@@ -894,7 +894,7 @@ private[sql] case class DescribeCommandFormatted(
       (field.name, field.dataType.simpleString, comment)
     }
     val colPropStr = if (colProps.toString().trim().length() > 0) {
-      // drops additional comma at end
+      // drops additional comma at endpom
       colProps.toString().dropRight(1)
     } else {
       colProps.toString()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 6ed826f..b5b8261 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -21,8 +21,8 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 
 /**
   * Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index e2b185e..fd2ab2a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**
  * Utility for global dictionary test cases

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 63f8c1f..f0de47d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -21,11 +21,12 @@ package org.apache.carbondata.spark.util
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index a126686..d1d31c1 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -31,8 +31,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 
 class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index 405322b..8864d3e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -24,8 +24,8 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 
 /**
   * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5335fe2..5b76c79 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -54,17 +54,21 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
-import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.{CarbonDataLoadingException, NoRetryException}
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.processing.splits.TableSplit
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
-import org.apache.carbondata.spark.load.{FailureCauses, _}
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -528,7 +532,7 @@ object CarbonDataRDDFactory {
           isCompactionTriggerByDDl
         )
         var storeLocation = ""
-        val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
         if (null != configuredStore && configuredStore.nonEmpty) {
           storeLocation = configuredStore(Random.nextInt(configuredStore.length))
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index e0a8b58..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
-  /**
-   * createCarbonInputFormat from query model
-   */
-  def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
-  (CarbonTableInputFormat[Array[Object]], Job) = {
-    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
-    val jobConf: JobConf = new JobConf(new Configuration)
-    val job: Job = new Job(jobConf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    (carbonInputFormat, job)
-  }
-
-  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
-      conf: Configuration) : CarbonTableInputFormat[V] = {
-    val carbonInputFormat = new CarbonTableInputFormat[V]()
-    val job: Job = new Job(conf)
-    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
-    carbonInputFormat
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 28c53a1..52df2a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 897895a..9018f7b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -38,11 +38,11 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 02c7023..23318f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -20,7 +20,13 @@ package org.apache.spark.sql.execution.command.mutation
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -37,10 +43,10 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.CarbonInputFormat
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.util.QueryPlanUtil
 
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -93,8 +99,7 @@ object DeleteExecution {
       dataRdd
     }
 
-    val (carbonInputFormat, job) =
-      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+    val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier)
     CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val keyRdd = deleteRdd.map({ row =>
       val tupleId: String = row
@@ -319,4 +324,21 @@ object DeleteExecution {
 
     true
   }
+
+  private def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
+  (CarbonTableInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = new Job(jobConf)
+    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    (carbonInputFormat, job)
+  }
+
+  private def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
+      conf: Configuration) : CarbonTableInputFormat[V] = {
+    val carbonInputFormat = new CarbonTableInputFormat[V]()
+    val job: Job = new Job(conf)
+    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    carbonInputFormat
+  }
 }