You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/28 13:08:59 UTC

[1/2] incubator-carbondata git commit: remove partitioner in rdd package

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 23bae4c4a -> 879bfe742


remove partitioner in rdd package

remove partitioner in query


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f7b76fd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f7b76fd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f7b76fd6

Branch: refs/heads/master
Commit: f7b76fd650222dd2fc99c24e95d4648d1ef4d776
Parents: 23bae4c
Author: jackylk <ja...@huawei.com>
Authored: Mon Nov 28 20:31:36 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Nov 28 21:08:08 2016 +0800

----------------------------------------------------------------------
 .../spark/load/DeleteLoadFolders.java           |  13 +-
 .../spark/merger/CarbonDataMergerUtil.java      |  46 ++----
 .../api/impl/QueryPartitionHelper.java          | 109 +--------------
 .../carbondata/spark/util/CarbonQueryUtil.java  | 139 ++-----------------
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDataFrameRDD.scala          |  36 -----
 .../spark/rdd/CarbonDataLoadRDD.scala           |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  71 +++-------
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   3 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonDropTableRDD.scala          |   8 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   6 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |   2 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   5 +-
 .../org/apache/spark/sql/CarbonContext.scala    |  28 +---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  29 +++-
 .../execution/command/carbonTableSchema.scala   |   9 +-
 .../spark/sql/hive/DistributionUtil.scala       |   7 +-
 18 files changed, 93 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index d1ff644..2b3979f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -224,16 +224,13 @@ public final class DeleteLoadFolders {
   /**
    * @param loadModel
    * @param storeLocation
-   * @param partitionCount
    * @param isForceDelete
    * @param details
    * @return
    *
    */
   public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
-      String storeLocation, int partitionCount, boolean isForceDelete,
-      LoadMetadataDetails[] details) {
-    String path = null;
+      String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
     List<LoadMetadataDetails> deletedLoads =
         new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
@@ -242,12 +239,8 @@ public final class DeleteLoadFolders {
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
         if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          boolean deletionStatus = false;
-
-          for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
-            path = getSegmentPath(loadModel, storeLocation, partitionId, oneLoad);
-            deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
-          }
+          String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+          boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
           if (deletionStatus) {
             isDeleted = true;
             oneLoad.setVisibility("false");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 9be4d47..84e6c00 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -219,12 +219,11 @@ public final class CarbonDataMergerUtil {
    *
    * @param storeLocation
    * @param carbonLoadModel
-   * @param partitionCount
    * @param compactionSize
    * @return
    */
   public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
-      CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize,
+      CarbonLoadModel carbonLoadModel, long compactionSize,
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
 
     List sortedSegments = new ArrayList(segments);
@@ -245,7 +244,7 @@ public final class CarbonDataMergerUtil {
     if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
 
       listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
-          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation);
+          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
     } else {
 
       listOfSegmentsToBeMerged =
@@ -399,13 +398,12 @@ public final class CarbonDataMergerUtil {
    * @param compactionSize
    * @param listOfSegmentsAfterPreserve
    * @param carbonLoadModel
-   * @param partitionCount
    * @param storeLocation
    * @return
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
       long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) {
+      CarbonLoadModel carbonLoadModel, String storeLocation) {
 
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -423,7 +421,7 @@ public final class CarbonDataMergerUtil {
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition =
-          getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId);
+          getSizeOfSegment(storeLocation, tableIdentifier, segId);
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
       if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -460,30 +458,19 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * For calculating the size of a segment across all partition.
-   * @param partitionCount
+   * For calculating the size of the specified segment
    * @param storeLocation
    * @param tableIdentifier
    * @param segId
    * @return
    */
-  private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation,
+  private static long getSizeOfSegment(String storeLocation,
       CarbonTableIdentifier tableIdentifier, String segId) {
-    long sizeOfOneSegmentAcrossPartition = 0;
-    // calculate size across partitions
-    for (int partition = 0; partition < partitionCount; partition++) {
-
-      String loadPath = CarbonLoaderUtil
-          .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
-
-      CarbonFile segmentFolder =
-          FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-
-      long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
-
-      sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
-    }
-    return sizeOfOneSegmentAcrossPartition;
+    String loadPath = CarbonLoaderUtil
+        .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
+    CarbonFile segmentFolder =
+        FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+    return getSizeOfFactFileInLoad(segmentFolder);
   }
 
   /**
@@ -691,9 +678,6 @@ public final class CarbonDataMergerUtil {
 
   /**
    * Removing the already merged segments from list.
-   * @param segments
-   * @param loadsToMerge
-   * @return
    */
   public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
       List<LoadMetadataDetails> segments,
@@ -701,17 +685,11 @@ public final class CarbonDataMergerUtil {
 
     // take complete list of segments.
     List<LoadMetadataDetails> list = new ArrayList<>(segments);
-
-    List<LoadMetadataDetails> trimmedList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
     // sort list
     CarbonDataMergerUtil.sortSegments(list);
 
     // first filter out newly added segments.
-    trimmedList = list.subList(0, list.indexOf(lastSeg) + 1);
-
-    return trimmedList;
+    return list.subList(0, list.indexOf(lastSeg) + 1);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
index 72e7b08..e05be7d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -19,30 +19,18 @@
 
 package org.apache.carbondata.spark.partition.api.impl;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.spark.partition.api.DataPartitioner;
 import org.apache.carbondata.spark.partition.api.Partition;
 
-import org.apache.spark.sql.execution.command.Partitioner;
 
 public final class QueryPartitionHelper {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(QueryPartitionHelper.class.getName());
   private static QueryPartitionHelper instance = new QueryPartitionHelper();
-  private Properties properties;
-  private String defaultPartitionerClass;
   private Map<String, DataPartitioner> partitionerMap =
       new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
   private Map<String, DefaultLoadBalancer> loadBalancerMap =
@@ -57,85 +45,10 @@ public final class QueryPartitionHelper {
   }
 
   /**
-   * Read the properties from CSVFilePartitioner.properties
-   */
-  private static Properties loadProperties() {
-    Properties properties = new Properties();
-
-    File file = new File("DataPartitioner.properties");
-    FileInputStream fis = null;
-    try {
-      if (file.exists()) {
-        fis = new FileInputStream(file);
-
-        properties.load(fis);
-      }
-    } catch (Exception e) {
-      LOGGER
-          .error(e, e.getMessage());
-    } finally {
-      if (null != fis) {
-        try {
-          fis.close();
-        } catch (IOException e) {
-          LOGGER.error(e,
-              e.getMessage());
-        }
-      }
-    }
-
-    return properties;
-
-  }
-
-  private void checkInitialization(String tableUniqueName, Partitioner partitioner) {
-    //Initialise if not done earlier
-
-    //String nodeListString = null;
-    if (properties == null) {
-      properties = loadProperties();
-
-      // nodeListString = properties.getProperty("nodeList", "master,slave1,slave2,slave3");
-
-      defaultPartitionerClass = properties.getProperty("partitionerClass",
-          "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl");
-
-      LOGGER.info(this.getClass().getSimpleName() + " is using following configurations.");
-      LOGGER.info("partitionerClass : " + defaultPartitionerClass);
-      LOGGER.info("nodeList : " + Arrays.toString(partitioner.nodeList()));
-    }
-
-    if (partitionerMap.get(tableUniqueName) == null) {
-      DataPartitioner dataPartitioner;
-      try {
-        dataPartitioner =
-            (DataPartitioner) Class.forName(partitioner.partitionClass()).newInstance();
-        dataPartitioner.initialize("", new String[0], partitioner);
-
-        List<Partition> partitions = dataPartitioner.getAllPartitions();
-        DefaultLoadBalancer loadBalancer =
-            new DefaultLoadBalancer(Arrays.asList(partitioner.nodeList()), partitions);
-        partitionerMap.put(tableUniqueName, dataPartitioner);
-        loadBalancerMap.put(tableUniqueName, loadBalancer);
-      } catch (ClassNotFoundException e) {
-        LOGGER.error(e,
-            e.getMessage());
-      } catch (InstantiationException e) {
-        LOGGER.error(e,
-            e.getMessage());
-      } catch (IllegalAccessException e) {
-        LOGGER.error(e,
-            e.getMessage());
-      }
-    }
-  }
-
-  /**
    * Get partitions applicable for query based on filters applied in query
    */
-  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan, Partitioner partitioner) {
+  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
     String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
-    checkInitialization(tableUniqueName, partitioner);
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 
@@ -143,38 +56,22 @@ public final class QueryPartitionHelper {
     return queryPartitions;
   }
 
-  public List<Partition> getAllPartitions(String databaseName, String tableName,
-      Partitioner partitioner) {
+  public List<Partition> getAllPartitions(String databaseName, String tableName) {
     String tableUniqueName = databaseName + '_' + tableName;
-    checkInitialization(tableUniqueName, partitioner);
 
     DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
 
     return dataPartitioner.getAllPartitions();
   }
 
-  public void removePartition(String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
-    partitionerMap.remove(tableUniqueName);
-  }
-
   /**
    * Get the node name where the partition is assigned to.
    */
-  public String getLocation(Partition partition, String databaseName, String tableName,
-      Partitioner partitioner) {
+  public String getLocation(Partition partition, String databaseName, String tableName) {
     String tableUniqueName = databaseName + '_' + tableName;
-    checkInitialization(tableUniqueName, partitioner);
 
     DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
     return loadBalancer.getNodeForPartitions(partition);
   }
 
-  public String[] getPartitionedColumns(String databaseName, String tableName,
-      Partitioner partitioner) {
-    String tableUniqueName = databaseName + '_' + tableName;
-    checkInitialization(tableUniqueName, partitioner);
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-    return dataPartitioner.getPartitionedColumns();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index 7393b67..d2e716f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -21,19 +21,12 @@ package org.apache.carbondata.spark.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.spark.partition.api.Partition;
 import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
@@ -42,7 +35,7 @@ import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
 import org.apache.carbondata.spark.splits.TableSplit;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.sql.execution.command.Partitioner;
+
 /**
  * This utilty parses the Carbon query plan to actual query model object.
  */
@@ -52,21 +45,20 @@ public final class CarbonQueryUtil {
 
   }
 
-
   /**
    * It creates the one split for each region server.
    */
   public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan, Partitioner partitioner) throws IOException {
+      CarbonQueryPlan queryPlan) throws IOException {
 
     //Just create splits depends on locations of region servers
     List<Partition> allPartitions = null;
     if (queryPlan == null) {
       allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName, partitioner);
+          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
     } else {
       allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan, partitioner);
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
     }
     TableSplit[] splits = new TableSplit[allPartitions.size()];
     for (int i = 0; i < splits.length; i++) {
@@ -74,7 +66,7 @@ public final class CarbonQueryUtil {
       List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
       Partition partition = allPartitions.get(i);
       String location = QueryPartitionHelper.getInstance()
-          .getLocation(partition, databaseName, tableName, partitioner);
+          .getLocation(partition, databaseName, tableName);
       locations.add(location);
       splits[i].setPartition(partition);
       splits[i].setLocations(locations);
@@ -86,14 +78,12 @@ public final class CarbonQueryUtil {
   /**
    * It creates the one split for each region server.
    */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath, String[] nodeList,
-      int partitionCount) throws Exception {
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
 
     //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
     DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath, fileType, partitionCount);
-    loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
+    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
     TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
     for (int i = 0; i < tblSplits.length; i++) {
       tblSplits[i] = new TableSplit();
@@ -108,55 +98,6 @@ public final class CarbonQueryUtil {
   }
 
   /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
-      int partitionCount) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
-    loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-    return splits;
-  }
-
-  public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
-      throws Exception {
-
-    if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
-      throw new Exception("Source file doesn't exist at path: " + sourcePath);
-    }
-
-    CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
-    if (file.isDirectory()) {
-      CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile pathname) {
-          return true;
-        }
-      });
-      for (int i = 0; i < fileNames.length; i++) {
-        getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
-      }
-    } else {
-      // add only csv files
-      if (file.getName().endsWith("csv")) {
-        partitionsFiles.add(file.getPath());
-      }
-    }
-  }
-
-  /**
    * split sourcePath by comma
    */
   public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
@@ -169,64 +110,22 @@ public final class CarbonQueryUtil {
     }
   }
 
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath, FileType fileType,
-      int partitionCount) throws Exception {
+  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
     List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
     List<Partition> partitionList =
         new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
 
-    for (int i = 0; i < partitionCount; i++) {
-      partitionFiles.put(i, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-      partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles.get(i)));
-    }
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(i % partitionCount).add(files.get(i));
-    }
-    return partitionList;
-  }
+    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
 
-  private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
-      int partitionCount) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
-    int startIndex = 0;
-    int endIndex = 0;
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    if (numberOfFilesPerPartition != null) {
-      for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
-        List<String> partitionFiles =
-            new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-        endIndex += numberOfFilesPerPartition[i];
-        for (int j = startIndex; j < endIndex; j++) {
-          partitionFiles.add(files.get(j));
-        }
-        startIndex += numberOfFilesPerPartition[i];
-        partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
-      }
+    for (int i = 0; i < files.size(); i++) {
+      partitionFiles.get(i % 1).add(files.get(i));
     }
     return partitionList;
   }
 
-  private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
-    int div = numberOfFiles / partitionCount;
-    int mod = numberOfFiles % partitionCount;
-    int[] numberOfNodeToScan = null;
-    if (div > 0) {
-      numberOfNodeToScan = new int[partitionCount];
-      Arrays.fill(numberOfNodeToScan, div);
-    } else if (mod > 0) {
-      numberOfNodeToScan = new int[mod];
-    }
-    for (int i = 0; i < mod; i++) {
-      numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
-    }
-    return numberOfNodeToScan;
-  }
-
   public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
     List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     if (null != details) {
@@ -240,16 +139,4 @@ public final class CarbonQueryUtil {
     return slices;
   }
 
-  /**
-   * This method will clear the dictionary cache for a given map of columns and dictionary cache
-   * mapping
-   *
-   * @param columnToDictionaryMap
-   */
-  public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
-    for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
-      CarbonUtil.clearDictionaryCache(entry.getValue());
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 51d65e3..5a02bfd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -39,7 +39,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
deleted file mode 100644
index 6854893..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import org.apache.spark.sql.{CarbonContext, DataFrame, Row}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class CarbonDataFrameRDD(val cc: CarbonContext, logicalPlan: LogicalPlan)
-  extends DataFrame(cc, logicalPlan) {
-
-  override def collect(): Array[Row] = {
-
-    // executing the query
-    val rows: Array[Row] = super.collect()
-
-    // result
-    rows
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 0b23657..87b5673 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -200,11 +200,10 @@ class DataFileLoaderRDD[K, V](
       // for table split partition
       var splits = Array[TableSplit]()
       if (carbonLoadModel.isDirectLoad) {
-        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
-          partitioner.nodeList, partitioner.partitionCount)
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
       } else {
         splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, null, partitioner)
+          carbonLoadModel.getTableName, null)
       }
 
       splits.zipWithIndex.map { case (split, index) =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8445440..6c09607 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.carbondata.spark.rdd
 
 import java.util
@@ -30,23 +29,20 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{util => _, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
-CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
-CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -55,6 +51,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.partition.api.Partition
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
 
@@ -67,17 +64,6 @@ object CarbonDataRDDFactory {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def mergeCarbonData(
-      sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      storeLocation: String,
-      storePath: String,
-      partitioner: Partitioner) {
-    val table = CarbonMetadata.getInstance()
-      .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
-    val metaDataPath: String = table.getMetaDataFilepath
-  }
-
   def deleteLoadByDate(
       sqlContext: SQLContext,
       schema: CarbonDataLoadSchema,
@@ -86,8 +72,7 @@ object CarbonDataRDDFactory {
       storePath: String,
       dateField: String,
       dateFieldActualName: String,
-      dateValue: String,
-      partitioner: Partitioner) {
+      dateValue: String) {
 
     val sc = sqlContext
     // Delete the records based on data
@@ -103,7 +88,6 @@ object CarbonDataRDDFactory {
       dateField,
       dateFieldActualName,
       dateValue,
-      partitioner,
       table.getFactTableName,
       tableName,
       storePath,
@@ -205,8 +189,10 @@ object CarbonDataRDDFactory {
 
   def alterTableForCompaction(sqlContext: SQLContext,
       alterTableModel: AlterTableModel,
-      carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
-      kettleHomePath: String, storeLocation: String): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      storePath: String,
+      kettleHomePath: String,
+      storeLocation: String): Unit = {
     var compactionSize: Long = 0
     var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
     if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
@@ -250,7 +236,6 @@ object CarbonDataRDDFactory {
       LOGGER.info("System level compaction lock is enabled.")
       handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
-        partitioner,
         storePath,
         kettleHomePath,
         storeLocation,
@@ -271,7 +256,6 @@ object CarbonDataRDDFactory {
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
-            partitioner,
             storePath,
             kettleHomePath,
             storeLocation,
@@ -295,7 +279,6 @@ object CarbonDataRDDFactory {
 
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      partitioner: Partitioner,
       storePath: String,
       kettleHomePath: String,
       storeLocation: String,
@@ -312,7 +295,6 @@ object CarbonDataRDDFactory {
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
-          partitioner,
           storePath,
           kettleHomePath,
           storeLocation,
@@ -351,7 +333,6 @@ object CarbonDataRDDFactory {
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
       storePath: String,
       compactionModel: CompactionModel,
-      partitioner: Partitioner,
       executor: ExecutorService,
       sqlContext: SQLContext,
       kettleHomePath: String,
@@ -365,7 +346,6 @@ object CarbonDataRDDFactory {
     var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
       storePath,
       carbonLoadModel,
-      partitioner.partitionCount,
       compactionModel.compactionSize,
       segList,
       compactionModel.compactionType
@@ -386,7 +366,6 @@ object CarbonDataRDDFactory {
         compactionModel,
         kettleHomePath,
         carbonLoadModel,
-        partitioner,
         storeLocation
       )
 
@@ -417,7 +396,6 @@ object CarbonDataRDDFactory {
       loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
         storePath,
         carbonLoadModel,
-        partitioner.partitionCount,
         compactionModel.compactionSize,
         segList,
         compactionModel.compactionType
@@ -439,7 +417,6 @@ object CarbonDataRDDFactory {
       compactionModel: CompactionModel,
       kettleHomePath: String,
       carbonLoadModel: CarbonLoadModel,
-      partitioner: Partitioner,
       storeLocation: String): Unit = {
 
     loadsToMerge.asScala.foreach(seg => {
@@ -449,7 +426,6 @@ object CarbonDataRDDFactory {
 
     val compactionCallableModel = CompactionCallableModel(storePath,
       carbonLoadModel,
-      partitioner,
       storeLocation,
       compactionModel.carbonTable,
       kettleHomePath,
@@ -468,7 +444,6 @@ object CarbonDataRDDFactory {
 
   def startCompactionThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      partitioner: Partitioner,
       storePath: String,
       kettleHomePath: String,
       storeLocation: String,
@@ -499,7 +474,6 @@ object CarbonDataRDDFactory {
             executeCompaction(carbonLoadModel: CarbonLoadModel,
               storePath: String,
               compactionModel: CompactionModel,
-              partitioner: Partitioner,
               executor, sqlContext, kettleHomePath, storeLocation
             )
             triggeredCompactionStatus = true
@@ -550,7 +524,6 @@ object CarbonDataRDDFactory {
                 executeCompaction(newCarbonLoadModel,
                   newCarbonLoadModel.getStorePath,
                   newcompactionModel,
-                  partitioner,
                   executor, sqlContext, kettleHomePath, storeLocation
                 )
               } catch {
@@ -672,7 +645,6 @@ object CarbonDataRDDFactory {
 
           handleCompactionForSystemLocking(sqlContext,
             carbonLoadModel,
-            partitioner,
             storePath,
             kettleHomePath,
             storeLocation,
@@ -691,7 +663,6 @@ object CarbonDataRDDFactory {
             try {
               startCompactionThreads(sqlContext,
                 carbonLoadModel,
-                partitioner,
                 storePath,
                 kettleHomePath,
                 storeLocation,
@@ -728,7 +699,7 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
-      deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
+      deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
         isForceDeletion = false)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -793,9 +764,7 @@ object CarbonDataRDDFactory {
           var splits = Array[TableSplit]()
           if (carbonLoadModel.isDirectLoad) {
             // get all table Splits, this part means files were divide to different partitions
-            splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
-              partitioner.nodeList, partitioner.partitionCount
-            )
+            splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
             // get all partition blocks from file list
             blocksGroupBy = splits.map {
               split =>
@@ -813,8 +782,7 @@ object CarbonDataRDDFactory {
           } else {
             // get all table Splits,when come to this, means data have been partition
             splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, null, partitioner
-            )
+              carbonLoadModel.getTableName, null)
             // get all partition blocks from factFilePath/uniqueID/
             blocksGroupBy = splits.map {
               split =>
@@ -1060,7 +1028,7 @@ object CarbonDataRDDFactory {
 
   def deleteLoadsAndUpdateMetadata(
       carbonLoadModel: CarbonLoadModel,
-      table: CarbonTable, partitioner: Partitioner,
+      table: CarbonTable,
       storePath: String,
       isForceDeletion: Boolean) {
     if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
@@ -1073,8 +1041,7 @@ object CarbonDataRDDFactory {
 
       // Delete marked loads
       val isUpdationRequired = DeleteLoadFolders
-        .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath,
-          partitioner.partitionCount, isForceDeletion, details)
+        .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
 
       if (isUpdationRequired) {
         try {
@@ -1113,20 +1080,17 @@ object CarbonDataRDDFactory {
   def dropTable(
       sc: SparkContext,
       schema: String,
-      table: String,
-      partitioner: Partitioner) {
+      table: String) {
     val v: Value[Array[Object]] = new ValueImpl()
-    new CarbonDropTableRDD(sc, v, schema, table, partitioner).collect
+    new CarbonDropTableRDD(sc, v, schema, table).collect
   }
 
   def cleanFiles(
       sc: SparkContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
-      partitioner: Partitioner) {
+      storePath: String) {
     val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
-    val metaDataPath: String = table.getMetaDataFilepath
     val carbonCleanFilesLock = CarbonLockFactory
       .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
         LockUsage.CLEAN_FILES_LOCK
@@ -1136,7 +1100,6 @@ object CarbonDataRDDFactory {
         LOGGER.info("Clean files lock has been successfully acquired.")
         deleteLoadsAndUpdateMetadata(carbonLoadModel,
           table,
-          partitioner,
           storePath,
           isForceDeletion = true)
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 15f9b44..343a602 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -37,7 +37,6 @@ class CarbonDeleteLoadByDateRDD[K, V](
     dateField: String,
     dateFieldActualName: String,
     dateValue: String,
-    partitioner: Partitioner,
     factTableName: String,
     dimTableName: String,
     storePath: String,
@@ -47,7 +46,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {s =>
       new CarbonLoadPartition(id, s._2, s._1)
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index a78c67b..26e1abc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -38,7 +38,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {f =>
       new CarbonLoadPartition(id, f._2, f._1)
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index abdeaf5..47689bd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -30,14 +30,13 @@ class CarbonDropTableRDD[V: ClassTag](
     sc: SparkContext,
     valueClass: Value[V],
     databaseName: String,
-    tableName: String,
-    partitioner: Partitioner)
+    tableName: String)
   extends RDD[V](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map { s =>
       new CarbonLoadPartition(id, s._2, s._1)
     }
@@ -46,9 +45,6 @@ class CarbonDropTableRDD[V: ClassTag](
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
 
     val iter = new Iterator[V] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-
-      val partitionCount = partitioner.partitionCount
       // TODO: Clear Btree from memory
 
       var havePair = false

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4e820c6..249f2cd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonContext
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
@@ -273,7 +273,7 @@ class CarbonMergerRDD[K, V](
     val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
       confExecutors
     } else { nodeBlockMapping.size() }
-    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
+    CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
     logInfo("No.of Executors required=" + requiredExecutors +
             " , spark.executor.instances=" + confExecutors +
             ", no.of.nodes where data present=" + nodeBlockMapping.size())

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 5fdbc5d..6c6076e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -40,7 +40,6 @@ object Compactor {
   def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
 
     val storePath = compactionCallableModel.storePath
-    val partitioner = compactionCallableModel.partitioner
     val storeLocation = compactionCallableModel.storeLocation
     val carbonTable = compactionCallableModel.carbonTable
     val kettleHomePath = compactionCallableModel.kettleHomePath
@@ -60,7 +59,6 @@ object Compactor {
     val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
     val carbonMergerMapping = CarbonMergerMapping(storeLocation,
       storePath,
-      partitioner,
       carbonTable.getMetaDataFilepath,
       mergedLoadName,
       kettleHomePath,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 86c12f8..3a393ed 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -74,11 +74,10 @@ class NewCarbonDataLoadRDD[K, V](
       var splits: Array[TableSplit] = null
 
       if (carbonLoadModel.isDirectLoad) {
-        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
-          partitioner.nodeList, partitioner.partitionCount)
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
       } else {
         splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, null, partitioner)
+          carbonLoadModel.getTableName, null)
       }
 
       splits.zipWithIndex.map { s =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 5d2221d..ba97083 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD
 
 class CarbonContext(
     val sc: SparkContext,
@@ -134,7 +133,7 @@ class CarbonContext(
     val logicPlan: LogicalPlan = parseSql(sql)
     statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis())
     recorder.recordStatisticsForDriver(statistic, queryId)
-    val result = new CarbonDataFrameRDD(this, logicPlan)
+    val result = new DataFrame(this, logicPlan)
 
     // We force query optimization to happen right away instead of letting it happen lazily like
     // when using the query DSL.  This is so DDL commands behave as expected.  This is only
@@ -183,29 +182,4 @@ object CarbonContext {
     cache(sc) = cc
   }
 
-  /**
-   *
-   * Requesting the extra executors other than the existing ones.
-   *
-   * @param sc
-   * @param numExecutors
-   * @return
-   */
-  final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
-    sc.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
-        val requiredExecutors = numExecutors - b.numExistingExecutors
-        LOGGER
-          .info(s"number of executors is =$numExecutors existing executors are =" +
-                s"${ b.numExistingExecutors }"
-          )
-        if (requiredExecutors > 0) {
-          b.requestExecutors(requiredExecutors)
-        }
-        true
-      case _ =>
-        false
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 9f6b0b4..be77954 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,15 +17,19 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, HiveContext}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
+
 /**
  * Carbon Environment for unified context
  */
 case class CarbonEnv(hiveContext: HiveContext, carbonCatalog: CarbonMetastoreCatalog)
 
 object CarbonEnv {
-  val className = classOf[CarbonEnv].getCanonicalName
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   var carbonEnv: CarbonEnv = _
 
   def getInstance(sqlContext: SQLContext): CarbonEnv = {
@@ -36,6 +40,29 @@ object CarbonEnv {
     }
     carbonEnv
   }
+
+  /**
+   *
+   * Requesting the extra executors other than the existing ones.
+   *
+   * @param sc
+   * @param numExecutors
+   * @return
+   */
+  final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        val requiredExecutors = numExecutors - b.numExistingExecutors
+        LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
+                s"${ b.numExistingExecutors }")
+        if (requiredExecutors > 0) {
+          b.requestExecutors(requiredExecutors)
+        }
+        true
+      case _ =>
+        false
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 22cc548..b7673db 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -99,7 +99,6 @@ case class DataLoadTableFileMapping(table: String, loadPath: String)
 
 case class CarbonMergerMapping(storeLocation: String,
     storePath: String,
-    partitioner: Partitioner,
     metadataFilePath: String,
     mergedLoadName: String,
     kettleHomePath: String,
@@ -126,7 +125,6 @@ case class CompactionModel(compactionSize: Long,
 
 case class CompactionCallableModel(storePath: String,
     carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
     storeLocation: String,
     carbonTable: CarbonTable,
     kettleHomePath: String,
@@ -493,7 +491,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
         .alterTableForCompaction(sqlContext,
           alterTableModel,
           carbonLoadModel,
-          partitioner,
           relation.tableMeta.storePath,
           kettleHomePath,
           storeLocation
@@ -1162,8 +1159,7 @@ private[sql] case class DeleteLoadByDate(
       CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath,
       level,
       actualColName,
-      dateValue,
-      relation.tableMeta.partitioner)
+      dateValue)
     LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
     Seq.empty
   }
@@ -1201,8 +1197,7 @@ private[sql] case class CleanFiles(
       CarbonDataRDDFactory.cleanFiles(
         sqlContext.sparkContext,
         carbonLoadModel,
-        relation.tableMeta.storePath,
-        relation.tableMeta.partitioner)
+        relation.tableMeta.storePath)
       LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
     } catch {
       case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 25c36c5..02453bd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -21,15 +21,12 @@ import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.CarbonContext
+import org.apache.spark.sql.{CarbonContext, CarbonEnv}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.datastore.block.Distributable
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
-/**
- *
- */
 object DistributionUtil {
   @transient
   val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
@@ -131,7 +128,7 @@ object DistributionUtil {
     }
 
     val startTime = System.currentTimeMillis()
-    CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
+    CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
     var nodes = DistributionUtil.getNodeList(sparkContext)
     var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {


[2/2] incubator-carbondata git commit: [CARBONDATA-461] clean partitioner in carbon RDD package This closes #363

Posted by ja...@apache.org.
[CARBONDATA-461] clean partitioner in carbon RDD package This closes #363


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/879bfe74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/879bfe74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/879bfe74

Branch: refs/heads/master
Commit: 879bfe742def18923558c1bcd114addeff3db0bf
Parents: 23bae4c f7b76fd
Author: jackylk <ja...@huawei.com>
Authored: Mon Nov 28 21:08:45 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Nov 28 21:08:45 2016 +0800

----------------------------------------------------------------------
 .../spark/load/DeleteLoadFolders.java           |  13 +-
 .../spark/merger/CarbonDataMergerUtil.java      |  46 ++----
 .../api/impl/QueryPartitionHelper.java          | 109 +--------------
 .../carbondata/spark/util/CarbonQueryUtil.java  | 139 ++-----------------
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDataFrameRDD.scala          |  36 -----
 .../spark/rdd/CarbonDataLoadRDD.scala           |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  71 +++-------
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   3 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonDropTableRDD.scala          |   8 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   6 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |   2 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   5 +-
 .../org/apache/spark/sql/CarbonContext.scala    |  28 +---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  29 +++-
 .../execution/command/carbonTableSchema.scala   |   9 +-
 .../spark/sql/hive/DistributionUtil.scala       |   7 +-
 18 files changed, 93 insertions(+), 427 deletions(-)
----------------------------------------------------------------------