You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/28 13:08:59 UTC
[1/2] incubator-carbondata git commit: remove partitioner in rdd
package
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 23bae4c4a -> 879bfe742
remove partitioner in rdd package
remove partitioner in query
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f7b76fd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f7b76fd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f7b76fd6
Branch: refs/heads/master
Commit: f7b76fd650222dd2fc99c24e95d4648d1ef4d776
Parents: 23bae4c
Author: jackylk <ja...@huawei.com>
Authored: Mon Nov 28 20:31:36 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Nov 28 21:08:08 2016 +0800
----------------------------------------------------------------------
.../spark/load/DeleteLoadFolders.java | 13 +-
.../spark/merger/CarbonDataMergerUtil.java | 46 ++----
.../api/impl/QueryPartitionHelper.java | 109 +--------------
.../carbondata/spark/util/CarbonQueryUtil.java | 139 ++-----------------
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDataFrameRDD.scala | 36 -----
.../spark/rdd/CarbonDataLoadRDD.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 71 +++-------
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 3 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonDropTableRDD.scala | 8 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 2 -
.../spark/rdd/NewCarbonDataLoadRDD.scala | 5 +-
.../org/apache/spark/sql/CarbonContext.scala | 28 +---
.../scala/org/apache/spark/sql/CarbonEnv.scala | 29 +++-
.../execution/command/carbonTableSchema.scala | 9 +-
.../spark/sql/hive/DistributionUtil.scala | 7 +-
18 files changed, 93 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index d1ff644..2b3979f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -224,16 +224,13 @@ public final class DeleteLoadFolders {
/**
* @param loadModel
* @param storeLocation
- * @param partitionCount
* @param isForceDelete
* @param details
* @return
*
*/
public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
- String storeLocation, int partitionCount, boolean isForceDelete,
- LoadMetadataDetails[] details) {
- String path = null;
+ String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
List<LoadMetadataDetails> deletedLoads =
new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -242,12 +239,8 @@ public final class DeleteLoadFolders {
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- boolean deletionStatus = false;
-
- for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
- path = getSegmentPath(loadModel, storeLocation, partitionId, oneLoad);
- deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
- }
+ String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
+ boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
if (deletionStatus) {
isDeleted = true;
oneLoad.setVisibility("false");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 9be4d47..84e6c00 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -219,12 +219,11 @@ public final class CarbonDataMergerUtil {
*
* @param storeLocation
* @param carbonLoadModel
- * @param partitionCount
* @param compactionSize
* @return
*/
public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
- CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize,
+ CarbonLoadModel carbonLoadModel, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {
List sortedSegments = new ArrayList(segments);
@@ -245,7 +244,7 @@ public final class CarbonDataMergerUtil {
if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
- listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation);
+ listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
} else {
listOfSegmentsToBeMerged =
@@ -399,13 +398,12 @@ public final class CarbonDataMergerUtil {
* @param compactionSize
* @param listOfSegmentsAfterPreserve
* @param carbonLoadModel
- * @param partitionCount
* @param storeLocation
* @return
*/
private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
- CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) {
+ CarbonLoadModel carbonLoadModel, String storeLocation) {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -423,7 +421,7 @@ public final class CarbonDataMergerUtil {
String segId = segment.getLoadName();
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition =
- getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId);
+ getSizeOfSegment(storeLocation, tableIdentifier, segId);
// if size of a segment is greater than the Major compaction size. then ignore it.
if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -460,30 +458,19 @@ public final class CarbonDataMergerUtil {
}
/**
- * For calculating the size of a segment across all partition.
- * @param partitionCount
+ * For calculating the size of the specified segment
* @param storeLocation
* @param tableIdentifier
* @param segId
* @return
*/
- private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation,
+ private static long getSizeOfSegment(String storeLocation,
CarbonTableIdentifier tableIdentifier, String segId) {
- long sizeOfOneSegmentAcrossPartition = 0;
- // calculate size across partitions
- for (int partition = 0; partition < partitionCount; partition++) {
-
- String loadPath = CarbonLoaderUtil
- .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
-
- CarbonFile segmentFolder =
- FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-
- long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
-
- sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
- }
- return sizeOfOneSegmentAcrossPartition;
+ String loadPath = CarbonLoaderUtil
+ .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
+ CarbonFile segmentFolder =
+ FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+ return getSizeOfFactFileInLoad(segmentFolder);
}
/**
@@ -691,9 +678,6 @@ public final class CarbonDataMergerUtil {
/**
* Removing the already merged segments from list.
- * @param segments
- * @param loadsToMerge
- * @return
*/
public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
List<LoadMetadataDetails> segments,
@@ -701,17 +685,11 @@ public final class CarbonDataMergerUtil {
// take complete list of segments.
List<LoadMetadataDetails> list = new ArrayList<>(segments);
-
- List<LoadMetadataDetails> trimmedList =
- new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
// sort list
CarbonDataMergerUtil.sortSegments(list);
// first filter out newly added segments.
- trimmedList = list.subList(0, list.indexOf(lastSeg) + 1);
-
- return trimmedList;
+ return list.subList(0, list.indexOf(lastSeg) + 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
index 72e7b08..e05be7d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -19,30 +19,18 @@
package org.apache.carbondata.spark.partition.api.impl;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.scan.model.CarbonQueryPlan;
import org.apache.carbondata.spark.partition.api.DataPartitioner;
import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.spark.sql.execution.command.Partitioner;
public final class QueryPartitionHelper {
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(QueryPartitionHelper.class.getName());
private static QueryPartitionHelper instance = new QueryPartitionHelper();
- private Properties properties;
- private String defaultPartitionerClass;
private Map<String, DataPartitioner> partitionerMap =
new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
private Map<String, DefaultLoadBalancer> loadBalancerMap =
@@ -57,85 +45,10 @@ public final class QueryPartitionHelper {
}
/**
- * Read the properties from CSVFilePartitioner.properties
- */
- private static Properties loadProperties() {
- Properties properties = new Properties();
-
- File file = new File("DataPartitioner.properties");
- FileInputStream fis = null;
- try {
- if (file.exists()) {
- fis = new FileInputStream(file);
-
- properties.load(fis);
- }
- } catch (Exception e) {
- LOGGER
- .error(e, e.getMessage());
- } finally {
- if (null != fis) {
- try {
- fis.close();
- } catch (IOException e) {
- LOGGER.error(e,
- e.getMessage());
- }
- }
- }
-
- return properties;
-
- }
-
- private void checkInitialization(String tableUniqueName, Partitioner partitioner) {
- //Initialise if not done earlier
-
- //String nodeListString = null;
- if (properties == null) {
- properties = loadProperties();
-
- // nodeListString = properties.getProperty("nodeList", "master,slave1,slave2,slave3");
-
- defaultPartitionerClass = properties.getProperty("partitionerClass",
- "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl");
-
- LOGGER.info(this.getClass().getSimpleName() + " is using following configurations.");
- LOGGER.info("partitionerClass : " + defaultPartitionerClass);
- LOGGER.info("nodeList : " + Arrays.toString(partitioner.nodeList()));
- }
-
- if (partitionerMap.get(tableUniqueName) == null) {
- DataPartitioner dataPartitioner;
- try {
- dataPartitioner =
- (DataPartitioner) Class.forName(partitioner.partitionClass()).newInstance();
- dataPartitioner.initialize("", new String[0], partitioner);
-
- List<Partition> partitions = dataPartitioner.getAllPartitions();
- DefaultLoadBalancer loadBalancer =
- new DefaultLoadBalancer(Arrays.asList(partitioner.nodeList()), partitions);
- partitionerMap.put(tableUniqueName, dataPartitioner);
- loadBalancerMap.put(tableUniqueName, loadBalancer);
- } catch (ClassNotFoundException e) {
- LOGGER.error(e,
- e.getMessage());
- } catch (InstantiationException e) {
- LOGGER.error(e,
- e.getMessage());
- } catch (IllegalAccessException e) {
- LOGGER.error(e,
- e.getMessage());
- }
- }
- }
-
- /**
* Get partitions applicable for query based on filters applied in query
*/
- public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan, Partitioner partitioner) {
+ public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
- checkInitialization(tableUniqueName, partitioner);
DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
@@ -143,38 +56,22 @@ public final class QueryPartitionHelper {
return queryPartitions;
}
- public List<Partition> getAllPartitions(String databaseName, String tableName,
- Partitioner partitioner) {
+ public List<Partition> getAllPartitions(String databaseName, String tableName) {
String tableUniqueName = databaseName + '_' + tableName;
- checkInitialization(tableUniqueName, partitioner);
DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
return dataPartitioner.getAllPartitions();
}
- public void removePartition(String databaseName, String tableName) {
- String tableUniqueName = databaseName + '_' + tableName;
- partitionerMap.remove(tableUniqueName);
- }
-
/**
* Get the node name where the partition is assigned to.
*/
- public String getLocation(Partition partition, String databaseName, String tableName,
- Partitioner partitioner) {
+ public String getLocation(Partition partition, String databaseName, String tableName) {
String tableUniqueName = databaseName + '_' + tableName;
- checkInitialization(tableUniqueName, partitioner);
DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
return loadBalancer.getNodeForPartitions(partition);
}
- public String[] getPartitionedColumns(String databaseName, String tableName,
- Partitioner partitioner) {
- String tableUniqueName = databaseName + '_' + tableName;
- checkInitialization(tableUniqueName, partitioner);
- DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
- return dataPartitioner.getPartitionedColumns();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index 7393b67..d2e716f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -21,19 +21,12 @@ package org.apache.carbondata.spark.util;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.scan.model.CarbonQueryPlan;
import org.apache.carbondata.spark.partition.api.Partition;
import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
@@ -42,7 +35,7 @@ import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
import org.apache.carbondata.spark.splits.TableSplit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.spark.sql.execution.command.Partitioner;
+
/**
* This utilty parses the Carbon query plan to actual query model object.
*/
@@ -52,21 +45,20 @@ public final class CarbonQueryUtil {
}
-
/**
* It creates the one split for each region server.
*/
public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
- CarbonQueryPlan queryPlan, Partitioner partitioner) throws IOException {
+ CarbonQueryPlan queryPlan) throws IOException {
//Just create splits depends on locations of region servers
List<Partition> allPartitions = null;
if (queryPlan == null) {
allPartitions =
- QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName, partitioner);
+ QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
} else {
allPartitions =
- QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan, partitioner);
+ QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
}
TableSplit[] splits = new TableSplit[allPartitions.size()];
for (int i = 0; i < splits.length; i++) {
@@ -74,7 +66,7 @@ public final class CarbonQueryUtil {
List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
Partition partition = allPartitions.get(i);
String location = QueryPartitionHelper.getInstance()
- .getLocation(partition, databaseName, tableName, partitioner);
+ .getLocation(partition, databaseName, tableName);
locations.add(location);
splits[i].setPartition(partition);
splits[i].setLocations(locations);
@@ -86,14 +78,12 @@ public final class CarbonQueryUtil {
/**
* It creates the one split for each region server.
*/
- public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath, String[] nodeList,
- int partitionCount) throws Exception {
+ public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
//Just create splits depends on locations of region servers
- FileType fileType = FileFactory.getFileType(sourcePath);
DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath, fileType, partitionCount);
- loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
+ List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+ loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
for (int i = 0; i < tblSplits.length; i++) {
tblSplits[i] = new TableSplit();
@@ -108,55 +98,6 @@ public final class CarbonQueryUtil {
}
/**
- * It creates the one split for each region server.
- */
- public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
- int partitionCount) throws Exception {
-
- //Just create splits depends on locations of region servers
- FileType fileType = FileFactory.getFileType(sourcePath);
- DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
- loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
- TableSplit[] splits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < splits.length; i++) {
- splits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = loadBalancer.getNodeForPartitions(partition);
- locations.add(location);
- splits[i].setPartition(partition);
- splits[i].setLocations(locations);
- }
- return splits;
- }
-
- public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
- throws Exception {
-
- if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
- throw new Exception("Source file doesn't exist at path: " + sourcePath);
- }
-
- CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
- if (file.isDirectory()) {
- CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile pathname) {
- return true;
- }
- });
- for (int i = 0; i < fileNames.length; i++) {
- getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
- }
- } else {
- // add only csv files
- if (file.getName().endsWith("csv")) {
- partitionsFiles.add(file.getPath());
- }
- }
- }
-
- /**
* split sourcePath by comma
*/
public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
@@ -169,64 +110,22 @@ public final class CarbonQueryUtil {
}
}
- private static List<Partition> getAllFilesForDataLoad(String sourcePath, FileType fileType,
- int partitionCount) throws Exception {
+ private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
List<Partition> partitionList =
new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
- for (int i = 0; i < partitionCount; i++) {
- partitionFiles.put(i, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
- partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles.get(i)));
- }
- for (int i = 0; i < files.size(); i++) {
- partitionFiles.get(i % partitionCount).add(files.get(i));
- }
- return partitionList;
- }
+ partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+ partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
- private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
- int partitionCount) throws Exception {
- List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
- int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
- int startIndex = 0;
- int endIndex = 0;
- List<Partition> partitionList =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- if (numberOfFilesPerPartition != null) {
- for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
- List<String> partitionFiles =
- new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- endIndex += numberOfFilesPerPartition[i];
- for (int j = startIndex; j < endIndex; j++) {
- partitionFiles.add(files.get(j));
- }
- startIndex += numberOfFilesPerPartition[i];
- partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
- }
+ for (int i = 0; i < files.size(); i++) {
+ partitionFiles.get(i % 1).add(files.get(i));
}
return partitionList;
}
- private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
- int div = numberOfFiles / partitionCount;
- int mod = numberOfFiles % partitionCount;
- int[] numberOfNodeToScan = null;
- if (div > 0) {
- numberOfNodeToScan = new int[partitionCount];
- Arrays.fill(numberOfNodeToScan, div);
- } else if (mod > 0) {
- numberOfNodeToScan = new int[mod];
- }
- for (int i = 0; i < mod; i++) {
- numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
- }
- return numberOfNodeToScan;
- }
-
public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
if (null != details) {
@@ -240,16 +139,4 @@ public final class CarbonQueryUtil {
return slices;
}
- /**
- * This method will clear the dictionary cache for a given map of columns and dictionary cache
- * mapping
- *
- * @param columnToDictionaryMap
- */
- public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
- for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
- CarbonUtil.clearDictionaryCache(entry.getValue());
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 51d65e3..5a02bfd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -39,7 +39,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
deleted file mode 100644
index 6854893..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import org.apache.spark.sql.{CarbonContext, DataFrame, Row}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-class CarbonDataFrameRDD(val cc: CarbonContext, logicalPlan: LogicalPlan)
- extends DataFrame(cc, logicalPlan) {
-
- override def collect(): Array[Row] = {
-
- // executing the query
- val rows: Array[Row] = super.collect()
-
- // result
- rows
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 0b23657..87b5673 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -200,11 +200,10 @@ class DataFileLoaderRDD[K, V](
// for table split partition
var splits = Array[TableSplit]()
if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
- partitioner.nodeList, partitioner.partitionCount)
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
} else {
splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null, partitioner)
+ carbonLoadModel.getTableName, null)
}
splits.zipWithIndex.map { case (split, index) =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8445440..6c09607 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.carbondata.spark.rdd
import java.util
@@ -30,23 +29,20 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{util => _, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
-CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
-import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
-CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.etl.DataLoadingException
@@ -55,6 +51,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.partition.api.Partition
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
@@ -67,17 +64,6 @@ object CarbonDataRDDFactory {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def mergeCarbonData(
- sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storeLocation: String,
- storePath: String,
- partitioner: Partitioner) {
- val table = CarbonMetadata.getInstance()
- .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
- val metaDataPath: String = table.getMetaDataFilepath
- }
-
def deleteLoadByDate(
sqlContext: SQLContext,
schema: CarbonDataLoadSchema,
@@ -86,8 +72,7 @@ object CarbonDataRDDFactory {
storePath: String,
dateField: String,
dateFieldActualName: String,
- dateValue: String,
- partitioner: Partitioner) {
+ dateValue: String) {
val sc = sqlContext
// Delete the records based on data
@@ -103,7 +88,6 @@ object CarbonDataRDDFactory {
dateField,
dateFieldActualName,
dateValue,
- partitioner,
table.getFactTableName,
tableName,
storePath,
@@ -205,8 +189,10 @@ object CarbonDataRDDFactory {
def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
- carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
- kettleHomePath: String, storeLocation: String): Unit = {
+ carbonLoadModel: CarbonLoadModel,
+ storePath: String,
+ kettleHomePath: String,
+ storeLocation: String): Unit = {
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
@@ -250,7 +236,6 @@ object CarbonDataRDDFactory {
LOGGER.info("System level compaction lock is enabled.")
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
- partitioner,
storePath,
kettleHomePath,
storeLocation,
@@ -271,7 +256,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- partitioner,
storePath,
kettleHomePath,
storeLocation,
@@ -295,7 +279,6 @@ object CarbonDataRDDFactory {
def handleCompactionForSystemLocking(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
storePath: String,
kettleHomePath: String,
storeLocation: String,
@@ -312,7 +295,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- partitioner,
storePath,
kettleHomePath,
storeLocation,
@@ -351,7 +333,6 @@ object CarbonDataRDDFactory {
def executeCompaction(carbonLoadModel: CarbonLoadModel,
storePath: String,
compactionModel: CompactionModel,
- partitioner: Partitioner,
executor: ExecutorService,
sqlContext: SQLContext,
kettleHomePath: String,
@@ -365,7 +346,6 @@ object CarbonDataRDDFactory {
var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
storePath,
carbonLoadModel,
- partitioner.partitionCount,
compactionModel.compactionSize,
segList,
compactionModel.compactionType
@@ -386,7 +366,6 @@ object CarbonDataRDDFactory {
compactionModel,
kettleHomePath,
carbonLoadModel,
- partitioner,
storeLocation
)
@@ -417,7 +396,6 @@ object CarbonDataRDDFactory {
loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
storePath,
carbonLoadModel,
- partitioner.partitionCount,
compactionModel.compactionSize,
segList,
compactionModel.compactionType
@@ -439,7 +417,6 @@ object CarbonDataRDDFactory {
compactionModel: CompactionModel,
kettleHomePath: String,
carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
storeLocation: String): Unit = {
loadsToMerge.asScala.foreach(seg => {
@@ -449,7 +426,6 @@ object CarbonDataRDDFactory {
val compactionCallableModel = CompactionCallableModel(storePath,
carbonLoadModel,
- partitioner,
storeLocation,
compactionModel.carbonTable,
kettleHomePath,
@@ -468,7 +444,6 @@ object CarbonDataRDDFactory {
def startCompactionThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
storePath: String,
kettleHomePath: String,
storeLocation: String,
@@ -499,7 +474,6 @@ object CarbonDataRDDFactory {
executeCompaction(carbonLoadModel: CarbonLoadModel,
storePath: String,
compactionModel: CompactionModel,
- partitioner: Partitioner,
executor, sqlContext, kettleHomePath, storeLocation
)
triggeredCompactionStatus = true
@@ -550,7 +524,6 @@ object CarbonDataRDDFactory {
executeCompaction(newCarbonLoadModel,
newCarbonLoadModel.getStorePath,
newcompactionModel,
- partitioner,
executor, sqlContext, kettleHomePath, storeLocation
)
} catch {
@@ -672,7 +645,6 @@ object CarbonDataRDDFactory {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
- partitioner,
storePath,
kettleHomePath,
storeLocation,
@@ -691,7 +663,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- partitioner,
storePath,
kettleHomePath,
storeLocation,
@@ -728,7 +699,7 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
// Check if any load need to be deleted before loading new data
- deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
+ deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
isForceDeletion = false)
if (null == carbonLoadModel.getLoadMetadataDetails) {
readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -793,9 +764,7 @@ object CarbonDataRDDFactory {
var splits = Array[TableSplit]()
if (carbonLoadModel.isDirectLoad) {
// get all table Splits, this part means files were divide to different partitions
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
- partitioner.nodeList, partitioner.partitionCount
- )
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
// get all partition blocks from file list
blocksGroupBy = splits.map {
split =>
@@ -813,8 +782,7 @@ object CarbonDataRDDFactory {
} else {
// get all table Splits,when come to this, means data have been partition
splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null, partitioner
- )
+ carbonLoadModel.getTableName, null)
// get all partition blocks from factFilePath/uniqueID/
blocksGroupBy = splits.map {
split =>
@@ -1060,7 +1028,7 @@ object CarbonDataRDDFactory {
def deleteLoadsAndUpdateMetadata(
carbonLoadModel: CarbonLoadModel,
- table: CarbonTable, partitioner: Partitioner,
+ table: CarbonTable,
storePath: String,
isForceDeletion: Boolean) {
if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
@@ -1073,8 +1041,7 @@ object CarbonDataRDDFactory {
// Delete marked loads
val isUpdationRequired = DeleteLoadFolders
- .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath,
- partitioner.partitionCount, isForceDeletion, details)
+ .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details)
if (isUpdationRequired) {
try {
@@ -1113,20 +1080,17 @@ object CarbonDataRDDFactory {
def dropTable(
sc: SparkContext,
schema: String,
- table: String,
- partitioner: Partitioner) {
+ table: String) {
val v: Value[Array[Object]] = new ValueImpl()
- new CarbonDropTableRDD(sc, v, schema, table, partitioner).collect
+ new CarbonDropTableRDD(sc, v, schema, table).collect
}
def cleanFiles(
sc: SparkContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
- partitioner: Partitioner) {
+ storePath: String) {
val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
.getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName)
- val metaDataPath: String = table.getMetaDataFilepath
val carbonCleanFilesLock = CarbonLockFactory
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.CLEAN_FILES_LOCK
@@ -1136,7 +1100,6 @@ object CarbonDataRDDFactory {
LOGGER.info("Clean files lock has been successfully acquired.")
deleteLoadsAndUpdateMetadata(carbonLoadModel,
table,
- partitioner,
storePath,
isForceDeletion = true)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 15f9b44..343a602 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -37,7 +37,6 @@ class CarbonDeleteLoadByDateRDD[K, V](
dateField: String,
dateFieldActualName: String,
dateValue: String,
- partitioner: Partitioner,
factTableName: String,
dimTableName: String,
storePath: String,
@@ -47,7 +46,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map {s =>
new CarbonLoadPartition(id, s._2, s._1)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index a78c67b..26e1abc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -38,7 +38,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map {f =>
new CarbonLoadPartition(id, f._2, f._1)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index abdeaf5..47689bd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -30,14 +30,13 @@ class CarbonDropTableRDD[V: ClassTag](
sc: SparkContext,
valueClass: Value[V],
databaseName: String,
- tableName: String,
- partitioner: Partitioner)
+ tableName: String)
extends RDD[V](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner)
+ val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map { s =>
new CarbonLoadPartition(id, s._2, s._1)
}
@@ -46,9 +45,6 @@ class CarbonDropTableRDD[V: ClassTag](
override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
val iter = new Iterator[V] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
-
- val partitionCount = partitioner.partitionCount
// TODO: Clear Btree from memory
var havePair = false
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4e820c6..249f2cd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonContext
+import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
@@ -273,7 +273,7 @@ class CarbonMergerRDD[K, V](
val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
confExecutors
} else { nodeBlockMapping.size() }
- CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
+ CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
logInfo("No.of Executors required=" + requiredExecutors +
" , spark.executor.instances=" + confExecutors +
", no.of.nodes where data present=" + nodeBlockMapping.size())
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 5fdbc5d..6c6076e 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -40,7 +40,6 @@ object Compactor {
def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
val storePath = compactionCallableModel.storePath
- val partitioner = compactionCallableModel.partitioner
val storeLocation = compactionCallableModel.storeLocation
val carbonTable = compactionCallableModel.carbonTable
val kettleHomePath = compactionCallableModel.kettleHomePath
@@ -60,7 +59,6 @@ object Compactor {
val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
val carbonMergerMapping = CarbonMergerMapping(storeLocation,
storePath,
- partitioner,
carbonTable.getMetaDataFilepath,
mergedLoadName,
kettleHomePath,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 86c12f8..3a393ed 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -74,11 +74,10 @@ class NewCarbonDataLoadRDD[K, V](
var splits: Array[TableSplit] = null
if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
- partitioner.nodeList, partitioner.partitionCount)
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
} else {
splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null, partitioner)
+ carbonLoadModel.getTableName, null)
}
splits.zipWithIndex.map { s =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 5d2221d..ba97083 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD
class CarbonContext(
val sc: SparkContext,
@@ -134,7 +133,7 @@ class CarbonContext(
val logicPlan: LogicalPlan = parseSql(sql)
statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis())
recorder.recordStatisticsForDriver(statistic, queryId)
- val result = new CarbonDataFrameRDD(this, logicPlan)
+ val result = new DataFrame(this, logicPlan)
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
@@ -183,29 +182,4 @@ object CarbonContext {
cache(sc) = cc
}
- /**
- *
- * Requesting the extra executors other than the existing ones.
- *
- * @param sc
- * @param numExecutors
- * @return
- */
- final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
- sc.schedulerBackend match {
- case b: CoarseGrainedSchedulerBackend =>
- val requiredExecutors = numExecutors - b.numExistingExecutors
- LOGGER
- .info(s"number of executors is =$numExecutors existing executors are =" +
- s"${ b.numExistingExecutors }"
- )
- if (requiredExecutors > 0) {
- b.requestExecutors(requiredExecutors)
- }
- true
- case _ =>
- false
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 9f6b0b4..be77954 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,15 +17,19 @@
package org.apache.spark.sql
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, HiveContext}
+import org.apache.carbondata.common.logging.LogServiceFactory
+
/**
* Carbon Environment for unified context
*/
case class CarbonEnv(hiveContext: HiveContext, carbonCatalog: CarbonMetastoreCatalog)
object CarbonEnv {
- val className = classOf[CarbonEnv].getCanonicalName
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
var carbonEnv: CarbonEnv = _
def getInstance(sqlContext: SQLContext): CarbonEnv = {
@@ -36,6 +40,29 @@ object CarbonEnv {
}
carbonEnv
}
+
+ /**
+ *
+ * Requesting the extra executors other than the existing ones.
+ *
+ * @param sc
+ * @param numExecutors
+ * @return
+ */
+ final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
+ sc.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ val requiredExecutors = numExecutors - b.numExistingExecutors
+ LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
+ s"${ b.numExistingExecutors }")
+ if (requiredExecutors > 0) {
+ b.requestExecutors(requiredExecutors)
+ }
+ true
+ case _ =>
+ false
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 22cc548..b7673db 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -99,7 +99,6 @@ case class DataLoadTableFileMapping(table: String, loadPath: String)
case class CarbonMergerMapping(storeLocation: String,
storePath: String,
- partitioner: Partitioner,
metadataFilePath: String,
mergedLoadName: String,
kettleHomePath: String,
@@ -126,7 +125,6 @@ case class CompactionModel(compactionSize: Long,
case class CompactionCallableModel(storePath: String,
carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
storeLocation: String,
carbonTable: CarbonTable,
kettleHomePath: String,
@@ -493,7 +491,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
.alterTableForCompaction(sqlContext,
alterTableModel,
carbonLoadModel,
- partitioner,
relation.tableMeta.storePath,
kettleHomePath,
storeLocation
@@ -1162,8 +1159,7 @@ private[sql] case class DeleteLoadByDate(
CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath,
level,
actualColName,
- dateValue,
- relation.tableMeta.partitioner)
+ dateValue)
LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
Seq.empty
}
@@ -1201,8 +1197,7 @@ private[sql] case class CleanFiles(
CarbonDataRDDFactory.cleanFiles(
sqlContext.sparkContext,
carbonLoadModel,
- relation.tableMeta.storePath,
- relation.tableMeta.partitioner)
+ relation.tableMeta.storePath)
LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
} catch {
case ex: Exception =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f7b76fd6/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 25c36c5..02453bd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -21,15 +21,12 @@ import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
import scala.collection.JavaConverters._
import org.apache.spark.SparkContext
-import org.apache.spark.sql.CarbonContext
+import org.apache.spark.sql.{CarbonContext, CarbonEnv}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.datastore.block.Distributable
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-/**
- *
- */
object DistributionUtil {
@transient
val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
@@ -131,7 +128,7 @@ object DistributionUtil {
}
val startTime = System.currentTimeMillis()
- CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
+ CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30
while (nodes.length < requiredExecutors && maxTimes > 0) {
[2/2] incubator-carbondata git commit: [CARBONDATA-461] clean
partitioner in carbon RDD package This closes #363
Posted by ja...@apache.org.
[CARBONDATA-461] clean partitioner in carbon RDD package This closes #363
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/879bfe74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/879bfe74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/879bfe74
Branch: refs/heads/master
Commit: 879bfe742def18923558c1bcd114addeff3db0bf
Parents: 23bae4c f7b76fd
Author: jackylk <ja...@huawei.com>
Authored: Mon Nov 28 21:08:45 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Mon Nov 28 21:08:45 2016 +0800
----------------------------------------------------------------------
.../spark/load/DeleteLoadFolders.java | 13 +-
.../spark/merger/CarbonDataMergerUtil.java | 46 ++----
.../api/impl/QueryPartitionHelper.java | 109 +--------------
.../carbondata/spark/util/CarbonQueryUtil.java | 139 ++-----------------
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDataFrameRDD.scala | 36 -----
.../spark/rdd/CarbonDataLoadRDD.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 71 +++-------
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 3 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonDropTableRDD.scala | 8 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 2 -
.../spark/rdd/NewCarbonDataLoadRDD.scala | 5 +-
.../org/apache/spark/sql/CarbonContext.scala | 28 +---
.../scala/org/apache/spark/sql/CarbonEnv.scala | 29 +++-
.../execution/command/carbonTableSchema.scala | 9 +-
.../spark/sql/hive/DistributionUtil.scala | 7 +-
18 files changed, 93 insertions(+), 427 deletions(-)
----------------------------------------------------------------------