You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/25 10:21:01 UTC
[1/4] incubator-carbondata git commit: change ScanRdd to use
RecordReader
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 5c697e942 -> e05c0d5da
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index e3b74ad..4036a8b 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -60,29 +60,23 @@ public class SegmentStatusManager {
private static final LogService LOG =
LogServiceFactory.getLogService(SegmentStatusManager.class.getName());
- private AbsoluteTableIdentifier absoluteTableIdentifier;
-
- public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) {
- this.absoluteTableIdentifier = absoluteTableIdentifier;
- }
-
/**
* This will return the lock object used to lock the table status file before updation.
*
* @return
*/
- public ICarbonLock getTableStatusLock() {
- return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+ public static ICarbonLock getTableStatusLock(AbsoluteTableIdentifier identifier) {
+ return CarbonLockFactory.getCarbonLockObj(identifier.getCarbonTableIdentifier(),
LockUsage.TABLE_STATUS_LOCK);
}
/**
* This method will return last modified time of tablestatus file
*/
- public long getTableStatusLastModifiedTime() throws IOException {
- String tableStatusPath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath();
+ public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier)
+ throws IOException {
+ String tableStatusPath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(),
+ identifier.getCarbonTableIdentifier()).getTableStatusFilePath();
if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) {
return 0L;
} else {
@@ -97,15 +91,15 @@ public class SegmentStatusManager {
* @return
* @throws IOException
*/
- public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
+ public static SegmentStatus getSegmentStatus(AbsoluteTableIdentifier identifier)
+ throws IOException {
// @TODO: move reading LoadStatus file to separate class
- List<String> listOfValidSegments = new ArrayList<String>(10);
- List<String> listOfValidUpdatedSegments = new ArrayList<String>(10);
- List<String> listOfInvalidSegments = new ArrayList<String>(10);
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ List<String> validSegments = new ArrayList<String>(10);
+ List<String> validUpdatedSegments = new ArrayList<String>(10);
+ List<String> invalidSegments = new ArrayList<String>(10);
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(),
+ identifier.getCarbonTableIdentifier());
String dataPath = carbonTablePath.getTableStatusFilePath();
DataInputStream dataInputStream = null;
Gson gsonObjectToRead = new Gson();
@@ -114,9 +108,7 @@ public class SegmentStatusManager {
LoadMetadataDetails[] loadFolderDetailsArray;
try {
if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
-
dataInputStream = fileOperation.openForRead();
-
BufferedReader buffReader =
new BufferedReader(
new InputStreamReader(dataInputStream, CarbonCommonConstants.DEFAULT_CHARSET));
@@ -126,40 +118,31 @@ public class SegmentStatusManager {
List<LoadMetadataDetails> loadFolderDetails = Arrays.asList(loadFolderDetailsArray);
for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) {
- if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+ String loadStatus = loadMetadataDetails.getLoadStatus();
+ if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equalsIgnoreCase(loadStatus)
+ || CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)
+ || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equalsIgnoreCase(
+ loadStatus)) {
// check for merged loads.
if (null != loadMetadataDetails.getMergedLoadName()) {
- if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
- listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+ if (!validSegments.contains(loadMetadataDetails.getMergedLoadName())) {
+ validSegments.add(loadMetadataDetails.getMergedLoadName());
}
// if merged load is updated then put it in updated list
- if (CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
- listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+ if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) {
+ validUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
}
continue;
}
-
- if (CarbonCommonConstants.MARKED_FOR_UPDATE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-
- listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+ if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) {
+ validUpdatedSegments.add(loadMetadataDetails.getLoadName());
}
- listOfValidSegments.add(loadMetadataDetails.getLoadName());
- } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.SEGMENT_COMPACTED
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
- || CarbonCommonConstants.MARKED_FOR_DELETE
- .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
- listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+ validSegments.add(loadMetadataDetails.getLoadName());
+ } else if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equalsIgnoreCase(loadStatus)
+ || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(loadStatus)
+ || CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadStatus)) {
+ invalidSegments.add(loadMetadataDetails.getLoadName());
}
-
}
}
} catch (IOException e) {
@@ -167,7 +150,6 @@ public class SegmentStatusManager {
throw e;
} finally {
try {
-
if (null != dataInputStream) {
dataInputStream.close();
}
@@ -175,10 +157,9 @@ public class SegmentStatusManager {
LOG.error(e);
throw e;
}
-
}
- return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
- listOfInvalidSegments);
+
+ return new SegmentStatus(validSegments, validUpdatedSegments, invalidSegments);
}
/**
@@ -187,7 +168,7 @@ public class SegmentStatusManager {
* @param tableFolderPath
* @return
*/
- public LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
+ public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
@@ -223,13 +204,9 @@ public class SegmentStatusManager {
*
* @return
*/
- private String readCurrentTime() {
+ private static String readCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
+ return sdf.format(new Date());
}
/**
@@ -240,8 +217,7 @@ public class SegmentStatusManager {
* @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg,
* 0 otherwise
*/
- private Integer compareDateValues(Long loadValue, Long userValue) {
-
+ private static Integer compareDateValues(Long loadValue, Long userValue) {
return loadValue.compareTo(userValue);
}
@@ -252,10 +228,9 @@ public class SegmentStatusManager {
* @param tableFolderPath
* @return
*/
- public List<String> updateDeletionStatus(List<String> loadIds, String tableFolderPath)
- throws Exception {
- CarbonTableIdentifier carbonTableIdentifier =
- absoluteTableIdentifier.getCarbonTableIdentifier();
+ public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+ List<String> loadIds, String tableFolderPath) throws Exception {
+ CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
ICarbonLock carbonDeleteSegmentLock =
CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
ICarbonLock carbonTableStatusLock =
@@ -267,9 +242,8 @@ public class SegmentStatusManager {
if (carbonDeleteSegmentLock.lockWithRetries()) {
LOG.info("Delete segment lock has been successfully acquired");
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ identifier.getStorePath(), identifier.getCarbonTableIdentifier());
String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) {
@@ -335,10 +309,9 @@ public class SegmentStatusManager {
* @param tableFolderPath
* @return
*/
- public List<String> updateDeletionStatus(String loadDate, String tableFolderPath,
- Long loadStartTime) throws Exception {
- CarbonTableIdentifier carbonTableIdentifier =
- absoluteTableIdentifier.getCarbonTableIdentifier();
+ public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
+ String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
+ CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
ICarbonLock carbonDeleteSegmentLock =
CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK);
ICarbonLock carbonTableStatusLock =
@@ -350,9 +323,8 @@ public class SegmentStatusManager {
if (carbonDeleteSegmentLock.lockWithRetries()) {
LOG.info("Delete segment lock has been successfully acquired");
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ identifier.getStorePath(), identifier.getCarbonTableIdentifier());
String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
@@ -422,7 +394,7 @@ public class SegmentStatusManager {
* @param listOfLoadFolderDetailsArray
* @throws IOException
*/
- public void writeLoadDetailsIntoFile(String dataLoadLocation,
+ public static void writeLoadDetailsIntoFile(String dataLoadLocation,
LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
AtomicFileOperations fileWrite =
new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
@@ -458,7 +430,7 @@ public class SegmentStatusManager {
* @param invalidLoadIds
* @return invalidLoadIds
*/
- public List<String> updateDeletionStatus(List<String> loadIds,
+ public static List<String> updateDeletionStatus(List<String> loadIds,
LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) {
for (String loadId : loadIds) {
boolean loadFound = false;
@@ -502,7 +474,7 @@ public class SegmentStatusManager {
* @param invalidLoadTimestamps
* @return invalidLoadTimestamps
*/
- public List<String> updateDeletionStatus(String loadDate,
+ public static List<String> updateDeletionStatus(String loadDate,
LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps,
Long loadStartTime) {
// For each load timestamp loop through data and if the
@@ -543,7 +515,7 @@ public class SegmentStatusManager {
*
* @param streams - streams to close.
*/
- private void closeStreams(Closeable... streams) {
+ private static void closeStreams(Closeable... streams) {
// Added if to avoid NullPointerException in case one stream is being passed as null
if (null != streams) {
for (Closeable stream : streams) {
@@ -566,7 +538,7 @@ public class SegmentStatusManager {
* @return
*/
- public List<LoadMetadataDetails> updateLatestTableStatusDetails(
+ public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {
List<LoadMetadataDetails> newListMetadata =
@@ -584,7 +556,7 @@ public class SegmentStatusManager {
*
* @param loadMetadata
*/
- public void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
+ public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
// update status only if the segment is not marked for delete
if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
@@ -593,28 +565,28 @@ public class SegmentStatusManager {
}
- public static class ValidAndInvalidSegmentsInfo {
- private final List<String> listOfValidSegments;
- private final List<String> listOfValidUpdatedSegments;
- private final List<String> listOfInvalidSegments;
-
- private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
- List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) {
- this.listOfValidSegments = listOfValidSegments;
- this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
- this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
- }
+ public static class SegmentStatus {
+ private final List<String> validSegments;
+ private final List<String> validUpdatedSegments;
+ private final List<String> invalidSegments;
- public List<String> getInvalidSegments() {
- return listOfInvalidSegments;
+ private SegmentStatus(List<String> validSegments, List<String> validUpdatedSegments,
+ List<String> invalidSegments) {
+ this.validSegments = validSegments;
+ this.validUpdatedSegments = validUpdatedSegments;
+ this.invalidSegments = invalidSegments;
}
public List<String> getValidSegments() {
- return listOfValidSegments;
+ return validSegments;
}
public List<String> getUpadtedSegments() {
- return listOfValidUpdatedSegments;
+ return validUpdatedSegments;
+ }
+
+ public List<String> getInvalidSegments() {
+ return invalidSegments;
}
}
}
[3/4] incubator-carbondata git commit: change ScanRdd to use
RecordReader
Posted by ra...@apache.org.
change ScanRdd to use RecordReader
fix style
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5f6a56ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5f6a56ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5f6a56ca
Branch: refs/heads/master
Commit: 5f6a56cac26be7e5307a01296ce9351b60be0e66
Parents: 5c697e9
Author: jackylk <ja...@huawei.com>
Authored: Fri Nov 25 18:07:20 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 25 15:49:53 2016 +0530
----------------------------------------------------------------------
.../carbon/datastore/block/Distributable.java | 8 +-
.../carbon/datastore/block/TableBlockInfo.java | 3 +-
.../carbon/datastore/block/TableTaskInfo.java | 2 +-
.../scan/filter/FilterExpressionProcessor.java | 29 +-
.../examples/DataFrameAPIExample.scala | 5 +-
.../carbondata/hadoop/CarbonInputFormat.java | 334 +++++-----------
.../carbondata/hadoop/CarbonInputSplit.java | 112 +++++-
.../hadoop/CarbonMultiBlockSplit.java | 105 +++++
.../carbondata/hadoop/CarbonRecordReader.java | 34 +-
.../hadoop/readsupport/CarbonReadSupport.java | 2 +-
.../AbstractDictionaryDecodedReadSupport.java | 2 +-
.../impl/ArrayWritableReadSupport.java | 2 +-
.../readsupport/impl/RawDataReadSupport.java | 6 +-
.../hadoop/util/CarbonInputFormatUtil.java | 14 +
.../hadoop/ft/CarbonInputMapperTest.java | 15 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 99 +----
.../spark/merger/CarbonDataMergerUtil.java | 11 +-
.../readsupport/SparkRowReadSupportImpl.java | 9 +-
.../carbondata/spark/util/LoadMetadataUtil.java | 4 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 20 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 151 +++-----
.../carbondata/spark/rdd/CarbonScanRDD.scala | 387 ++++++++-----------
.../apache/carbondata/spark/rdd/Compactor.scala | 13 +-
.../carbondata/spark/util/QueryPlanUtil.scala | 56 ---
.../sql/CarbonDatasourceHadoopRelation.scala | 20 +-
.../spark/sql/CarbonDatasourceRelation.scala | 3 +-
.../org/apache/spark/sql/CarbonOperators.scala | 191 ---------
.../scala/org/apache/spark/sql/CarbonScan.scala | 186 +++++++++
.../execution/command/carbonTableSchema.scala | 20 +-
.../spark/sql/hive/DistributionUtil.scala | 21 +-
.../AllDataTypesTestCaseAggregate.scala | 20 +-
.../CompactionSystemLockFeatureTest.scala | 20 +-
.../DataCompactionBoundaryConditionsTest.scala | 3 -
.../DataCompactionCardinalityBoundryTest.scala | 10 +-
.../datacompaction/DataCompactionLockTest.scala | 11 +-
.../DataCompactionMinorThresholdTest.scala | 6 +-
.../DataCompactionNoDictionaryTest.scala | 10 +-
.../datacompaction/DataCompactionTest.scala | 14 +-
.../MajorCompactionIgnoreInMinorTest.scala | 32 +-
.../MajorCompactionStopsAfterCompaction.scala | 14 +-
.../dataretention/DataRetentionTestCase.scala | 5 +-
.../lcm/status/SegmentStatusManager.java | 160 ++++----
42 files changed, 964 insertions(+), 1205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
index 817aafc..99d4459 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java
@@ -16,10 +16,12 @@
*/
package org.apache.carbondata.core.carbon.datastore.block;
+import java.io.IOException;
+
/**
- * Abstract class which is maintains the locations of node.
+ * interface to get the locations of node. Used for making task distribution based on locality
*/
-public abstract class Distributable implements Comparable<Distributable> {
+public interface Distributable extends Comparable<Distributable> {
- public abstract String[] getLocations();
+ String[] getLocations() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index f8da9af..4bf0047 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -28,8 +28,7 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
* class will be used to pass the block detail detail will be passed form driver
* to all the executor to load the b+ tree
*/
-public class TableBlockInfo extends Distributable
- implements Serializable, Comparable<Distributable> {
+public class TableBlockInfo implements Distributable, Serializable {
/**
* serialization id
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
index 1f8caf0..7ce3a14 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java
@@ -27,7 +27,7 @@ import java.util.TreeMap;
/**
* This class is responsible for maintaining the mapping of tasks of a node.
*/
-public class TableTaskInfo extends Distributable {
+public class TableTaskInfo implements Distributable {
private final List<TableBlockInfo> tableBlockInfoList;
private final String taskId;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
index 1541867..44a3a31 100644
--- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNode;
import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
import org.apache.carbondata.core.carbon.datastore.IndexKey;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
@@ -127,12 +126,10 @@ public class FilterExpressionProcessor implements FilterProcessor {
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null);
while (startBlock != endBlock) {
- addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock,
- tableSegment.getSegmentProperties());
+ addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock);
startBlock = startBlock.getNextDataRefNode();
}
- addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock,
- tableSegment.getSegmentProperties());
+ addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock);
LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + (
System.currentTimeMillis() - startTimeInMillis)
+ " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan
@@ -147,11 +144,9 @@ public class FilterExpressionProcessor implements FilterProcessor {
* @param filterResolver
* @param listOfDataBlocksToScan
* @param dataRefNode
- * @param segmentProperties
*/
private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter,
- List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode,
- SegmentProperties segmentProperties) {
+ List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode) {
BitSet bitSet = filterExecuter
.isScanRequired(dataRefNode.getColumnsMaxValue(), dataRefNode.getColumnsMinValue());
@@ -174,7 +169,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
private FilterResolverIntf getFilterResolvertree(Expression expressionTree,
AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
FilterResolverIntf filterEvaluatorTree =
- createFilterResolverTree(expressionTree, tableIdentifier, null);
+ createFilterResolverTree(expressionTree, tableIdentifier);
traverseAndResolveTree(filterEvaluatorTree, tableIdentifier);
return filterEvaluatorTree;
}
@@ -212,24 +207,22 @@ public class FilterExpressionProcessor implements FilterProcessor {
* @return
*/
private FilterResolverIntf createFilterResolverTree(Expression expressionTree,
- AbsoluteTableIdentifier tableIdentifier, Expression intermediateExpression) {
+ AbsoluteTableIdentifier tableIdentifier) {
ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
BinaryExpression currentExpression = null;
switch (filterExpressionType) {
case OR:
currentExpression = (BinaryExpression) expressionTree;
return new LogicalFilterResolverImpl(
- createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
- currentExpression),
- createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
- currentExpression),currentExpression);
+ createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+ createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+ currentExpression);
case AND:
currentExpression = (BinaryExpression) expressionTree;
return new LogicalFilterResolverImpl(
- createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
- currentExpression),
- createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
- currentExpression), currentExpression);
+ createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
+ createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
+ currentExpression);
case EQUALS:
case IN:
return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 49fb0da..97fa152 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -24,7 +24,7 @@ object DataFrameAPIExample {
def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("DataFrameAPIExample")
- ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
+ ExampleUtils.writeSampleCarbonFile(cc, "carbon1", 1000)
// use datasource api to read
val in = cc.read
@@ -42,7 +42,8 @@ object DataFrameAPIExample {
println(s"count after 2 load: $count")
// use SQL to read
- cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show
+ cc.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show
+
cc.sql("DROP TABLE IF EXISTS carbon1")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4a51f52..a44a78c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -25,12 +25,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.datastore.DataRefNode;
@@ -50,7 +44,6 @@ import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.core.carbon.querystatistics.*;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -61,7 +54,6 @@ import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.scan.expression.Expression;
-import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.scan.filter.FilterUtil;
import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
@@ -74,6 +66,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -130,6 +123,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
}
+ public static void setTablePath(Configuration configuration, String tablePath)
+ throws IOException {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
+
/**
* It sets unresolved filter expression.
*
@@ -137,26 +135,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
* @param filterExpression
*/
public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
- try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
- configuration.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
+ if (filterExpression == null) {
+ return;
}
- }
-
- /**
- * It sets the resolved filter expression
- *
- * @param configuration
- * @param filterExpression
- */
- public static void setFilterPredicates(Configuration configuration,
- FilterResolverIntf filterExpression) {
try {
- if (filterExpression == null) {
- return;
- }
String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
configuration.set(FILTER_PREDICATE, filterString);
} catch (Exception e) {
@@ -164,7 +146,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
}
- public static void setColumnProjection(CarbonProjection projection, Configuration configuration) {
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
if (projection == null || projection.isEmpty()) {
return;
}
@@ -178,6 +160,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(COLUMN_PROJECTION, columnString);
}
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
public static void setCarbonReadSupport(Class<? extends CarbonReadSupport> readSupportClass,
Configuration configuration) {
if (readSupportClass != null) {
@@ -191,31 +177,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
/**
- * Set List of segments to access
+ * Set list of segments to access
*/
public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
- configuration
- .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
+ configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
+ CarbonUtil.getSegmentString(validSegments));
}
/**
- * Below method will be used to set the segments details if
- * segments are not added in the configuration
- *
- * @param job
- * @param absoluteTableIdentifier
- * @throws IOException
- */
- private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier)
- throws IOException {
- if (getSegmentsFromConfiguration(job).length == 0) {
- // Get the valid segments from the carbon store.
- SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
- new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
- setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments());
- }
- }
- /**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
* are used to get table path to read.
@@ -224,42 +193,46 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
* @return List<InputSplit> list of CarbonInputSplit
* @throws IOException
*/
- @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
- try {
- CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
- Object filterPredicates = getFilterPredicates(job.getConfiguration());
- AbsoluteTableIdentifier absoluteTableIdentifier =
- getAbsoluteTableIdentifier(job.getConfiguration());
- addSegmentsIfEmpty(job, absoluteTableIdentifier);
- if (filterPredicates == null) {
- return getSplitsNonFilter(job);
- } else {
- if (filterPredicates instanceof Expression) {
- //process and resolve the expression.
- CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
- return getSplits(job, CarbonInputFormatUtil
- .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier));
- } else {
- //It means user sets already resolved expression.
- return getSplits(job, (FilterResolverIntf) filterPredicates);
- }
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ List<String> invalidSegments = new ArrayList<>();
+
+ // get all valid segments and set them into the configuration
+ if (getSegmentsToAccess(job).length == 0) {
+ SegmentStatusManager.SegmentStatus segments =
+ SegmentStatusManager.getSegmentStatus(identifier);
+ setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments());
+ if (segments.getValidSegments().size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ if (invalidSegments.size() > 0) {
+ SegmentTaskIndexStore.getInstance().removeTableBlocks(invalidSegments, identifier);
}
- } catch (Exception ex) {
- throw new IOException(ex);
}
- }
- /**
- * the method will return the blocks to be scanned with blocklets info
- *
- * @param job
- * @return
- * @throws IOException
- * @throws IndexBuilderException
- */
- private List<InputSplit> getSplitsNonFilter(JobContext job)
- throws IOException, IndexBuilderException {
- return getSplits(job, null);
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+ List<InputSplit> splits;
+ try {
+ // do block filtering and get split
+ splits = getSplits(job, filterInterface);
+ } catch (IndexBuilderException e) {
+ throw new IOException(e);
+ }
+ // pass the invalid segment to task side in order to remove index entry in task side
+ if (invalidSegments.size() > 0) {
+ for (InputSplit split : splits) {
+ ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+ }
+ }
+ return splits;
}
private List<InputSplit> getSplitsInternal(JobContext job) throws IOException {
@@ -296,7 +269,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
getAbsoluteTableIdentifier(job.getConfiguration());
//for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : getSegmentsFromConfiguration(job)) {
+ for (String segmentNo : getSegmentsToAccess(job)) {
List<DataRefNode> dataRefNodes =
getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier,
filterResolver, segmentNo);
@@ -311,98 +284,23 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return result;
}
- /**
- * get total number of rows. Same as count(*)
- *
- * @throws IOException
- * @throws IndexBuilderException
- */
- public long getRowCount(JobContext job) throws IOException, IndexBuilderException {
-
- long rowCount = 0;
- AbsoluteTableIdentifier absoluteTableIdentifier =
- getAbsoluteTableIdentifier(job.getConfiguration());
- // no of core to load the blocks in driver
- addSegmentsIfEmpty(job, absoluteTableIdentifier);
- int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
- try {
- numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
- } catch (NumberFormatException e) {
- numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
- }
- // creating a thread pool
- ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
- List<Future<Map<String, AbstractIndex>>> loadedBlocks =
- new ArrayList<Future<Map<String, AbstractIndex>>>();
- //for each segment fetch blocks matching filter in Driver BTree
- for (String segmentNo : getSegmentsFromConfiguration(job)) {
- // submitting the task
- loadedBlocks
- .add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo)));
- }
- threadPool.shutdown();
- try {
- threadPool.awaitTermination(1, TimeUnit.HOURS);
- } catch (InterruptedException e) {
- throw new IndexBuilderException(e);
- }
- try {
- // adding all the rows of the blocks to get the total row
- // count
- for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
- for (AbstractIndex abstractIndex : block.get().values()) {
- rowCount += abstractIndex.getTotalNumberOfRows();
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new IndexBuilderException(e);
- }
- return rowCount;
- }
-
- /**
- * {@inheritDoc}
- * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
- * are used to get table path to read.
- *
- * @return
- * @throws IOException
- */
- public FilterResolverIntf getResolvedFilter(Configuration configuration,
- Expression filterExpression)
- throws IOException, IndexBuilderException, QueryExecutionException {
- if (filterExpression == null) {
- return null;
- }
- FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
- AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(configuration);
- //get resolved filter
- try {
- return filterExpressionProcessor.getFilterResolver(filterExpression, absoluteTableIdentifier);
- } catch (FilterUnsupportedException e) {
- throw new QueryExecutionException(e.getMessage());
- }
- }
-
- private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
- throws IOException {
+ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) {
String dirs = configuration.get(INPUT_DIR, "");
String[] inputPaths = StringUtils.split(dirs);
if (inputPaths.length == 0) {
- throw new IOException("No input paths specified in job");
+ throw new InvalidPathException("No input paths specified in job");
}
return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
}
- private Object getFilterPredicates(Configuration configuration) {
+ private Expression getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
return null;
}
- Object filterExprs = ObjectSerializationUtil.convertStringToObject(filterExprString);
- return filterExprs;
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
} catch (IOException e) {
throw new RuntimeException("Error while reading filter expression", e);
}
@@ -452,16 +350,13 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
* Below method will be used to get the table block info
*
* @param job job context
- * @param absoluteTableIdentifier absolute table identifier
* @param segmentId number of segment id
* @return list of table block
* @throws IOException
*/
- private List<TableBlockInfo> getTableBlockInfo(JobContext job,
- AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException {
- // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+ private List<TableBlockInfo> getTableBlockInfo(JobContext job, String segmentId)
+ throws IOException {
List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
- // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
// get file location of all files of given segment
JobContext newJob =
@@ -489,10 +384,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// if segment tree is not loaded, load the segment tree
if (segmentIndexMap == null) {
- // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
List<TableBlockInfo> tableBlockInfoList =
- getTableBlockInfo(job, absoluteTableIdentifier, segmentId);
- // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
+ getTableBlockInfo(job, segmentId);
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
@@ -535,30 +428,33 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return blocks;
}
- @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ @Override
+ public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getCarbonTable(configuration);
- QueryModel queryModel;
- try {
- CarbonQueryPlan queryPlan =
- CarbonInputFormatUtil.createQueryPlan(carbonTable, configuration.get(COLUMN_PROJECTION));
- queryModel =
- QueryModel.createModel(getAbsoluteTableIdentifier(configuration), queryPlan, carbonTable);
- Object filterPredicates = getFilterPredicates(configuration);
- if (filterPredicates != null) {
- if (filterPredicates instanceof Expression) {
- CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable);
- queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil
- .resolveFilter((Expression) filterPredicates,
- getAbsoluteTableIdentifier(configuration)));
- } else {
- queryModel.setFilterExpressionResolverTree((FilterResolverIntf) filterPredicates);
- }
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration);
+
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+ QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
}
- } catch (Exception e) {
- throw new IOException(e);
}
+
CarbonReadSupport readSupport = getReadSupportClass(configuration);
return new CarbonRecordReader<T>(queryModel, readSupport);
}
@@ -586,17 +482,20 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return readSupport;
}
- @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+ @Override
+ protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return super.computeSplitSize(blockSize, minSize, maxSize);
}
- @Override protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+ @Override
+ protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
return super.getBlockIndex(blkLocations, offset);
}
- @Override protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
- String[] segmentsToConsider = getSegmentsFromConfiguration(job);
+ String[] segmentsToConsider = getSegmentsToAccess(job);
if (segmentsToConsider.length == 0) {
throw new IOException("No segments found");
}
@@ -605,7 +504,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return result;
}
- @Override protected boolean isSplitable(JobContext context, Path filename) {
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
try {
// Don't split the file if it is local file system
FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
@@ -626,7 +526,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
throw new IOException("No partitions/data found");
}
- PathFilter inputFilter = getDataFileFilter(job);
+ PathFilter inputFilter = getDataFileFilter();
CarbonTablePath tablePath = getTablePath(job.getConfiguration());
// get tokens for all the required FileSystem for table path
@@ -658,10 +558,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
/**
- * @param job
* @return the PathFilter for Fact Files.
*/
- public PathFilter getDataFileFilter(JobContext job) {
+ private PathFilter getDataFileFilter() {
return new CarbonPathFilter(getUpdateExtension());
}
@@ -676,27 +575,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
/**
- * @return updateExtension
+ * return valid segment to access
*/
- private String[] getSegmentsFromConfiguration(JobContext job)
- throws IOException {
+ private String[] getSegmentsToAccess(JobContext job) throws IOException {
String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
- // if no segments
if (segmentString.trim().isEmpty()) {
return new String[0];
}
-
- String[] segments = segmentString.split(",");
- String[] segmentIds = new String[segments.length];
- int i = 0;
- try {
- for (; i < segments.length; i++) {
- segmentIds[i] = segments[i];
- }
- } catch (NumberFormatException e) {
- throw new IOException("segment no:" + segments[i] + " should be integer");
- }
- return segmentIds;
+ return segmentString.split(",");
}
/**
@@ -709,28 +595,4 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return new String[] { "0" };
}
- /**
- * Thread class to load the blocks
- */
- private class BlocksLoaderThread implements Callable<Map<String, AbstractIndex>> {
- // job
- private JobContext job;
-
- // table identifier
- private AbsoluteTableIdentifier absoluteTableIdentifier;
-
- // segment id
- private String segmentId;
-
- private BlocksLoaderThread(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier,
- String segmentId) {
- this.job = job;
- this.absoluteTableIdentifier = absoluteTableIdentifier;
- this.segmentId = segmentId;
- }
-
- @Override public Map<String, AbstractIndex> call() throws Exception {
- return getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index ffcd663..132ee43 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -22,8 +22,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
import org.apache.carbondata.hadoop.internal.index.Block;
import org.apache.hadoop.fs.Path;
@@ -33,23 +38,36 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Carbon input split to allow distributed read of CarbonInputFormat.
*/
-public class CarbonInputSplit extends FileSplit implements Serializable, Writable, Block {
+public class CarbonInputSplit extends FileSplit implements Distributable, Serializable, Writable,
+ Block {
private static final long serialVersionUID = 3520344046772190207L;
private String segmentId;
- /**
+ public String taskId;
+
+ /*
+ * Invalid segments that need to be removed in task side index
+ */
+ private List<String> invalidSegments;
+
+ /*
* Number of BlockLets in a block
*/
- private int numberOfBlocklets = 0;
+ private int numberOfBlocklets;
- public CarbonInputSplit() {
- super(null, 0, 0, new String[0]);
+ public CarbonInputSplit() {
+ segmentId = null;
+ taskId = "0";
+ numberOfBlocklets = 0;
+ invalidSegments = new ArrayList<>();
}
- public CarbonInputSplit(String segmentId, Path path, long start, long length,
+ private CarbonInputSplit(String segmentId, Path path, long start, long length,
String[] locations) {
super(path, start, length, locations);
this.segmentId = segmentId;
+ this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+ this.invalidSegments = new ArrayList<>();
}
public CarbonInputSplit(String segmentId, Path path, long start, long length,
@@ -67,16 +85,33 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
return segmentId;
}
- @Override public void readFields(DataInput in) throws IOException {
-
+ @Override
+ public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.segmentId = in.readUTF();
-
+ int numInvalidSegment = in.readInt();
+ invalidSegments = new ArrayList<>(numInvalidSegment);
+ for (int i = 0; i < numInvalidSegment; i++) {
+ invalidSegments.add(in.readUTF());
+ }
}
- @Override public void write(DataOutput out) throws IOException {
+ @Override
+ public void write(DataOutput out) throws IOException {
super.write(out);
out.writeUTF(segmentId);
+ out.writeInt(invalidSegments.size());
+ for (String invalidSegment: invalidSegments) {
+ out.writeUTF(invalidSegment);
+ }
+ }
+
+ public List<String> getInvalidSegments(){
+ return invalidSegments;
+ }
+
+ public void setInvalidSegments(List<String> invalidSegments) {
+ this.invalidSegments = invalidSegments;
}
/**
@@ -88,6 +123,63 @@ public class CarbonInputSplit extends FileSplit implements Serializable, Writabl
}
@Override
+ public int compareTo(Distributable o) {
+ CarbonInputSplit other = (CarbonInputSplit)o;
+ int compareResult = 0;
+ // get the segment id
+ // converr seg ID to double.
+
+ double seg1 = Double.parseDouble(segmentId);
+ double seg2 = Double.parseDouble(other.getSegmentId());
+ if (seg1 - seg2 < 0) {
+ return -1;
+ }
+ if (seg1 - seg2 > 0) {
+ return 1;
+ }
+
+ // Comparing the time task id of the file to other
+ // if both the task id of the file is same then we need to compare the
+ // offset of
+ // the file
+ String filePath1 = this.getPath().getName();
+ String filePath2 = other.getPath().getName();
+ if (CarbonTablePath.isCarbonDataFile(filePath1)) {
+ int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
+ int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+ if (firstTaskId != otherTaskId) {
+ return firstTaskId - otherTaskId;
+ }
+ // compare the part no of both block info
+ int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1));
+ int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2));
+ compareResult = firstPartNo - SecondPartNo;
+ } else {
+ compareResult = filePath1.compareTo(filePath2);
+ }
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ return 0;
+ }
+
+ public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+ for (CarbonInputSplit split : splitList) {
+ BlockletInfos blockletInfos = new BlockletInfos(split.getNumberOfBlocklets(), 0,
+ split.getNumberOfBlocklets());
+ try {
+ tableBlockInfoList.add(
+ new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
+ split.getLocations(), split.getLength(), blockletInfos));
+ } catch (IOException e) {
+ throw new RuntimeException("fail to get location of split: " + split, e);
+ }
+ }
+ return tableBlockInfoList;
+ }
+
+ @Override
public String getBlockPath() {
return getPath().getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
new file mode 100644
index 0000000..a13d6ba
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This class wraps multiple blocks belong to a same node to one split.
+ * So the scanning task will scan multiple blocks. This is an optimization for concurrent query.
+ */
+public class CarbonMultiBlockSplit extends InputSplit implements Writable {
+
+ /*
+ * Splits (HDFS Blocks) for task to scan.
+ */
+ private List<CarbonInputSplit> splitList;
+
+ /*
+ * The location of all wrapped splits belong to the same node
+ */
+ private String location;
+
+ public CarbonMultiBlockSplit() {
+ splitList = null;
+ location = null;
+ }
+
+ public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList,
+ String location) throws IOException {
+ this.splitList = splitList;
+ this.location = location;
+ }
+
+ /**
+ * Return all splits for scan
+ * @return split list for scan
+ */
+ public List<CarbonInputSplit> getAllSplits() {
+ return splitList;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ long total = 0;
+ for (InputSplit split: splitList) {
+ total += split.getLength();
+ }
+ return total;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{location};
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // write number of splits and then write all splits
+ out.writeInt(splitList.size());
+ for (CarbonInputSplit split: splitList) {
+ split.write(out);
+ }
+ out.writeUTF(location);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // read all splits
+ int numSplit = in.readInt();
+ splitList = new ArrayList<>(numSplit);
+ for (int i = 0; i < numSplit; i++) {
+ CarbonInputSplit split = new CarbonInputSplit();
+ split.readFields(in);
+ splitList.add(split);
+ }
+ location = in.readUTF();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 443922c..91e6e46 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -25,7 +25,6 @@ import java.util.Map;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
@@ -33,7 +32,6 @@ import org.apache.carbondata.scan.executor.QueryExecutor;
import org.apache.carbondata.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
import org.apache.carbondata.scan.model.QueryModel;
-import org.apache.carbondata.scan.result.BatchResult;
import org.apache.carbondata.scan.result.iterator.ChunkRowIterator;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -59,22 +57,28 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
}
- @Override public void initialize(InputSplit split, TaskAttemptContext context)
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
- CarbonInputSplit carbonInputSplit = (CarbonInputSplit) split;
- List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
- BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
- carbonInputSplit.getNumberOfBlocklets());
- tableBlockInfoList.add(
- new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
- carbonInputSplit.getSegmentId(), carbonInputSplit.getLocations(),
- carbonInputSplit.getLength(), blockletInfos));
+ // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+ // blocks and then set them in the query model.
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else if (inputSplit instanceof CarbonMultiBlockSplit){
+ // contains multiple blocks, this is an optimization for concurrent query.
+ CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+ splitList = multiBlockSplit.getAllSplits();
+ } else {
+ throw new RuntimeException("unsupported input split type: " + inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
queryModel.setTableBlockInfos(tableBlockInfoList);
- readSupport
- .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+ readSupport.initialize(queryModel.getProjectionColumns(),
+ queryModel.getAbsoluteTableIdentifier());
try {
- carbonIterator =
- new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+ carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
} catch (QueryExecutionException e) {
throw new InterruptedException(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index d3419c2..1b7f577 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -31,7 +31,7 @@ public interface CarbonReadSupport<T> {
*
* @param carbonColumns
*/
- public void intialize(CarbonColumn[] carbonColumns,
+ public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier);
public T readRow(Object[] data);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
index 6832789..fa8ba6e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
@@ -51,7 +51,7 @@ public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonR
* @param carbonColumns
* @param absoluteTableIdentifier
*/
- @Override public void intialize(CarbonColumn[] carbonColumns,
+ @Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) {
this.carbonColumns = carbonColumns;
dictionaries = new Dictionary[carbonColumns.length];
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
index dae83a1..4fbcb05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.ArrayWritable;
public class ArrayWritableReadSupport implements CarbonReadSupport<ArrayWritable> {
- @Override public void intialize(CarbonColumn[] carbonColumns,
+ @Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) {
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index 578ab43..59b45ad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -24,7 +24,8 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
- @Override public void intialize(CarbonColumn[] carbonColumns,
+ @Override
+ public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) {
}
@@ -34,7 +35,8 @@ public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
* @param data
* @return
*/
- @Override public Object[] readRow(Object[] data) {
+ @Override
+ public Object[] readRow(Object[] data) {
return data;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 0d94da4..75e004d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -19,12 +19,14 @@
package org.apache.carbondata.hadoop.util;
+import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.hadoop.CarbonInputFormat;
import org.apache.carbondata.scan.expression.Expression;
import org.apache.carbondata.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
@@ -33,6 +35,11 @@ import org.apache.carbondata.scan.model.QueryDimension;
import org.apache.carbondata.scan.model.QueryMeasure;
import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+
/**
* Utility class
*/
@@ -81,6 +88,13 @@ public class CarbonInputFormatUtil {
return plan;
}
+ public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+ Job job) throws IOException {
+ CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+ FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+ return carbonInputFormat;
+ }
+
private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) {
QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
queryMeasure.setQueryOrder(order);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 1c21a50..8693646 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class CarbonInputMapperTest extends TestCase {
@@ -61,8 +60,8 @@ public class CarbonInputMapperTest extends TestCase {
try {
String outPath = "target/output";
runJob(outPath, null, null);
- Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000);
- Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 7);
+ Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+ Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath));
} catch (Exception e) {
Assert.assertTrue("failed", false);
e.printStackTrace();
@@ -79,8 +78,8 @@ public class CarbonInputMapperTest extends TestCase {
carbonProjection.addColumn("salary");
runJob(outPath, carbonProjection, null);
- Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000);
- Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3);
+ Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath));
+ Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
} catch (Exception e) {
Assert.assertTrue("failed", false);
}
@@ -97,8 +96,8 @@ public class CarbonInputMapperTest extends TestCase {
new EqualToExpression(new ColumnExpression("country", DataType.STRING),
new LiteralExpression("france", DataType.STRING));
runJob(outPath, carbonProjection, expression);
- Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 101);
- Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3);
+ Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath));
+ Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath));
} catch (Exception e) {
Assert.assertTrue("failed", false);
}
@@ -171,7 +170,7 @@ public class CarbonInputMapperTest extends TestCase {
job.setOutputFormatClass(TextOutputFormat.class);
AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier();
if (projection != null) {
- CarbonInputFormat.setColumnProjection(projection, job.getConfiguration());
+ CarbonInputFormat.setColumnProjection(job.getConfiguration(), projection);
}
if (filter != null) {
CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 3d670a2..f2a1f9f 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -50,9 +50,7 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.ColumnIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
import org.apache.carbondata.core.carbon.datastore.block.Distributable;
-import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -236,9 +234,7 @@ public final class CarbonLoaderUtil {
final boolean isCompactionFlow) throws IOException {
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
String metaDataLocation = carbonTable.getMetaDataFilepath();
- SegmentStatusManager segmentStatusManager =
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
- final LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation);
+ final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
@@ -404,10 +400,7 @@ public final class CarbonLoaderUtil {
absoluteTableIdentifier.getCarbonTableIdentifier());
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
- ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
try {
if (carbonLock.lockWithRetries()) {
@@ -416,7 +409,7 @@ public final class CarbonLoaderUtil {
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
String loadEnddate = readCurrentTime();
loadMetadataDetails.setTimestamp(loadEnddate);
@@ -434,7 +427,7 @@ public final class CarbonLoaderUtil {
}
listOfLoadFolderDetails.add(loadMetadataDetails);
- segmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
.toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
status = true;
@@ -579,9 +572,13 @@ public final class CarbonLoaderUtil {
List<NodeBlockRelation> flattenedList =
new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Distributable blockInfo : blockInfos) {
- for (String eachNode : blockInfo.getLocations()) {
- NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
- flattenedList.add(nbr);
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
}
}
// sort the flattened data.
@@ -646,7 +643,7 @@ public final class CarbonLoaderUtil {
/**
* Assigning the blocks of a node to tasks.
*
- * @param nodeBlocksMap
+ * @param nodeBlocksMap nodeName to list of blocks mapping
* @param noOfTasksPerNode
* @return
*/
@@ -912,10 +909,14 @@ public final class CarbonLoaderUtil {
// put the blocks in the set
uniqueBlocks.add(blockInfo);
- for (String eachNode : blockInfo.getLocations()) {
- NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
- flattenedList.add(nbr);
- nodeList.add(eachNode);
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ nodeList.add(eachNode);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
}
}
}
@@ -954,66 +955,6 @@ public final class CarbonLoaderUtil {
}
/**
- * method to distribute the blocklets of a block in multiple blocks
- * @param blockInfoList
- * @param defaultParallelism
- * @return
- */
- public static List<Distributable> distributeBlockLets(List<TableBlockInfo> blockInfoList,
- int defaultParallelism) {
- String blockletDistributionString = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION,
- CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE);
- boolean isBlockletDistributionEnabled = Boolean.parseBoolean(blockletDistributionString);
- LOGGER.info("No.Of Blocks before Blocklet distribution: " + blockInfoList.size());
- List<Distributable> tableBlockInfos = new ArrayList<Distributable>();
- if (blockInfoList.size() < defaultParallelism && isBlockletDistributionEnabled) {
- for (TableBlockInfo tableBlockInfo : blockInfoList) {
- int noOfBlockLets = tableBlockInfo.getBlockletInfos().getNoOfBlockLets();
- LOGGER.info(
- "No.Of blocklet : " + noOfBlockLets + ".Minimum blocklets required for distribution : "
- + minBlockLetsReqForDistribution);
- if (noOfBlockLets < minBlockLetsReqForDistribution) {
- tableBlockInfos.add(tableBlockInfo);
- continue;
- }
- TableBlockInfo tableBlockInfo1 = null;
- int rem = noOfBlockLets % minBlockLetsReqForDistribution;
- int count = noOfBlockLets / minBlockLetsReqForDistribution;
- if (rem > 0) {
- count = count + 1;
- }
- for (int i = 0; i < count; i++) {
- BlockletInfos blockletInfos = new BlockletInfos();
- blockletInfos.setStartBlockletNumber(i * minBlockLetsReqForDistribution);
- blockletInfos.setNumberOfBlockletToScan(minBlockLetsReqForDistribution);
- blockletInfos.setNoOfBlockLets(blockletInfos.getNoOfBlockLets());
- tableBlockInfo1 =
- new TableBlockInfo(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
- tableBlockInfo.getSegmentId(), tableBlockInfo.getLocations(),
- tableBlockInfo.getBlockLength(), blockletInfos);
- tableBlockInfos.add(tableBlockInfo1);
- }
- //if rem is greater than 0 then for the last block
- if (rem > 0) {
- tableBlockInfo1.getBlockletInfos().setNumberOfBlockletToScan(rem);
- }
- }
- }
- if (tableBlockInfos.size() == 0) {
- {
- for (TableBlockInfo tableBlockInfo : blockInfoList) {
- tableBlockInfos.add(tableBlockInfo);
- }
- LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
- return tableBlockInfos;
- }
- }
- LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size());
- return tableBlockInfos;
- }
-
- /**
* This will update the old table status details before clean files to the latest table status.
* @param oldList
* @param newList
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 6e8ab0c..9be4d47 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -133,12 +133,7 @@ public final class CarbonDataMergerUtil {
boolean tableStatusUpdationStatus = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
- SegmentStatusManager segmentStatusManager =
- new SegmentStatusManager(absoluteTableIdentifier);
-
- ICarbonLock carbonLock =
- segmentStatusManager.getTableStatusLock();
+ ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
try {
if (carbonLock.lockWithRetries()) {
@@ -151,7 +146,7 @@ public final class CarbonDataMergerUtil {
String statusFilePath = carbonTablePath.getTableStatusFilePath();
- LoadMetadataDetails[] loadDetails = segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
String mergedLoadNumber = MergedLoadName.substring(
MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
@@ -195,7 +190,7 @@ public final class CarbonDataMergerUtil {
updatedDetailsList.add(loadMetadataDetails);
try {
- segmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+ SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
tableStatusUpdationStatus = true;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index af8d5e6..bb8fc5c 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -33,10 +33,10 @@ import org.apache.spark.unsafe.types.UTF8String;
public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
- @Override public void intialize(CarbonColumn[] carbonColumns,
+ @Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) {
- super.intialize(carbonColumns, absoluteTableIdentifier);
- //can intialize and generate schema here.
+ super.initialize(carbonColumns, absoluteTableIdentifier);
+ //can initialize and generate schema here.
}
@Override public Row readRow(Object[] data) {
@@ -52,6 +52,9 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
case TIMESTAMP:
data[i] = new Timestamp((long) data[i] / 1000);
break;
+ case LONG:
+ data[i] = data[i];
+ break;
default:
}
} else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 6b56545..11cf9f8 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -44,9 +44,7 @@ public final class LoadMetadataUtil {
.getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
String metaDataLocation = table.getMetaDataFilepath();
- SegmentStatusManager segmentStatusManager =
- new SegmentStatusManager(table.getAbsoluteTableIdentifier());
- LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation);
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneRow : details) {
if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4edfa6b..8445440 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -93,9 +93,8 @@ object CarbonDataRDDFactory {
// Delete the records based on data
val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
.getCarbonTable(databaseName + "_" + tableName)
- val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
val loadMetadataDetailsArray =
- segmentStatusManager.readLoadMetadata(table.getMetaDataFilepath()).toList
+ SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList
val resultMap = new CarbonDeleteLoadByDateRDD(
sc.sparkContext,
new DeletedLoadResultImpl(),
@@ -1055,10 +1054,7 @@ object CarbonDataRDDFactory {
def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
- val segmentStatusManager =
- new SegmentStatusManager(
- model.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier)
- val details = segmentStatusManager.readLoadMetadata(metadataPath)
+ val details = SegmentStatusManager.readLoadMetadata(metadataPath)
model.setLoadMetadataDetails(details.toList.asJava)
}
@@ -1070,9 +1066,7 @@ object CarbonDataRDDFactory {
if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
val loadMetadataFilePath = CarbonLoaderUtil
.extractLoadMetadataFileLocation(carbonLoadModel)
- val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
- val details = segmentStatusManager
- .readLoadMetadata(loadMetadataFilePath)
+ val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
val carbonTableStatusLock = CarbonLockFactory
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.TABLE_STATUS_LOCK)
@@ -1089,17 +1083,16 @@ object CarbonDataRDDFactory {
LOGGER.info("Table status lock has been successfully acquired.")
// read latest table status again.
- val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+ val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
// update the metadata details from old to new status.
val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
+ .updateLoadMetadataFromOldToNew(details, latestMetadata)
CarbonLoaderUtil.writeLoadMetadata(
carbonLoadModel.getCarbonDataLoadSchema,
carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, latestStatus
- )
+ carbonLoadModel.getTableName, latestStatus)
} else {
val errorMsg = "Clean files request is failed for " +
s"${ carbonLoadModel.getDatabaseName }." +
@@ -1109,7 +1102,6 @@ object CarbonDataRDDFactory {
LOGGER.audit(errorMsg)
LOGGER.error(errorMsg)
throw new Exception(errorMsg + " Please try after some time.")
-
}
} finally {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 6c2e993..4e820c6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -21,8 +21,11 @@ import java.util
import java.util.{Collections, List}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.Random
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -32,22 +35,19 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties,
-TableBlockInfo, TableTaskInfo, TaskBlockInfo}
+import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo}
import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.carbon.path.CarbonTablePath
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor,
-CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.scan.result.iterator.RawResultIterator
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.QueryPlanUtil
class CarbonMergerRDD[K, V](
@@ -103,17 +103,17 @@ class CarbonMergerRDD[K, V](
var mergeNumber = ""
var exec: CarbonCompactionExecutor = null
try {
- var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
// get destination segment properties as sent from driver which is of last segment.
- val segmentProperties = new SegmentProperties(carbonMergerMapping.maxSegmentColumnSchemaList
- .asJava,
+ val segmentProperties = new SegmentProperties(
+ carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
carbonMergerMapping.maxSegmentColCardinality)
// sorting the table block info List.
- val tableBlockInfoList = carbonSparkPartition.tableBlockInfos
+ val splitList = carbonSparkPartition.split.value.getAllSplits
+ val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
Collections.sort(tableBlockInfoList)
@@ -214,86 +214,46 @@ class CarbonMergerRDD[K, V](
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonSparkPartition]
- theSplit.locations.filter(_ != "localhost")
+ theSplit.split.value.getLocations.filter(_ != "localhost")
}
override def getPartitions: Array[Partition] = {
-
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
- val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
- QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+ val jobConf: JobConf = new JobConf(new Configuration)
+ val job: Job = new Job(jobConf)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
var defaultParallelism = sparkContext.defaultParallelism
val result = new util.ArrayList[Partition](defaultParallelism)
// mapping of the node and block list.
- var nodeMapping: util.Map[String, util.List[Distributable]] = new
+ var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
util.HashMap[String, util.List[Distributable]]
- var noOfBlocks = 0
-
- val taskInfoList = new util.ArrayList[Distributable]
-
- var blocksOfLastSegment: List[TableBlockInfo] = null
+ val noOfBlocks = 0
+ var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
// map for keeping the relation of a task and its blocks.
- val taskIdMapping: util.Map[String, util.List[TableBlockInfo]] = new
- util.HashMap[String, util.List[TableBlockInfo]]
-
job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
// get splits
- val splits = carbonInputFormat.getSplits(job)
- val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
- // take the blocks of one segment.
- val blocksOfOneSegment = carbonInputSplits.map(inputSplit =>
- new TableBlockInfo(inputSplit.getPath.toString,
- inputSplit.getStart, inputSplit.getSegmentId,
- inputSplit.getLocations, inputSplit.getLength
- )
- )
-
- // keep on assigning till last one is reached.
- if (null != blocksOfOneSegment && blocksOfOneSegment.nonEmpty) {
- blocksOfLastSegment = blocksOfOneSegment.asJava
- }
-
- // populate the task and its block mapping.
- blocksOfOneSegment.foreach(tableBlockInfo => {
- val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath)
- val blockList = taskIdMapping.get(taskNo)
- if (null == blockList) {
- val blockListTemp = new util.ArrayList[TableBlockInfo]()
- blockListTemp.add(tableBlockInfo)
- taskIdMapping.put(taskNo, blockListTemp)
- } else {
- blockList.add(tableBlockInfo)
- }
- }
- )
-
- noOfBlocks += blocksOfOneSegment.size
- taskIdMapping.asScala.foreach(
- entry =>
- taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
- )
+ val splits = format.getSplits(job)
+ carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
}
// prepare the details required to extract the segment properties using last segment.
- if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0) {
- val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1)
-
+ if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
+ val carbonInputSplit = carbonInputSplits.last
var dataFileFooter: DataFileFooter = null
try {
- dataFileFooter = CarbonUtil.readMetadatFile(lastBlockInfo.getFilePath,
- lastBlockInfo.getBlockOffset, lastBlockInfo.getBlockLength)
+ dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
+ carbonInputSplit.getStart, carbonInputSplit.getLength)
} catch {
case e: CarbonUtilException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
@@ -306,16 +266,17 @@ class CarbonMergerRDD[K, V](
.toList
}
// send complete list of blocks to the mapping util.
- nodeMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1)
+ nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
+ carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
val confExecutors = confExecutorsTemp.toInt
- val requiredExecutors = if (nodeMapping.size > confExecutors) {
+ val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
confExecutors
- } else { nodeMapping.size() }
+ } else { nodeBlockMapping.size() }
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
- logInfo("No.of Executors required=" + requiredExecutors
- + " , spark.executor.instances=" + confExecutors
- + ", no.of.nodes where data present=" + nodeMapping.size())
+ logInfo("No.of Executors required=" + requiredExecutors +
+ " , spark.executor.instances=" + confExecutors +
+ ", no.of.nodes where data present=" + nodeBlockMapping.size())
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30
while (nodes.length < requiredExecutors && maxTimes > 0) {
@@ -327,24 +288,23 @@ class CarbonMergerRDD[K, V](
defaultParallelism = sparkContext.defaultParallelism
var i = 0
- val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util
- .List[NodeInfo]]()
+ val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
// Create Spark Partition for each task and assign blocks
- nodeMapping.asScala.foreach { entry =>
-
- val taskBlockList: List[NodeInfo] = new util.ArrayList[NodeInfo](0)
- nodeTaskBlocksMap.put(entry._1, taskBlockList)
-
- val list = new util.ArrayList[TableBlockInfo]
- entry._2.asScala.foreach(taskInfo => {
- val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo]
- list.addAll(blocksPerNode.getTableBlockInfoList)
- taskBlockList
- .add(new NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size))
- })
- if (list.size() != 0) {
- result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, list))
+ nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
+ val taskBlockList = new util.ArrayList[NodeInfo](0)
+ nodeTaskBlocksMap.put(nodeName, taskBlockList)
+ var blockletCount = 0
+ blockList.asScala.foreach { taskInfo =>
+ val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
+ blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
+ taskBlockList.add(
+ NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
+ }
+ if (blockletCount != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
+ carbonInputSplits.asJava, nodeName)
+ result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
i += 1
}
}
@@ -360,17 +320,14 @@ class CarbonMergerRDD[K, V](
val noOfNodes = nodes.length
val noOfTasks = result.size
- logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
- + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
- )
- logInfo("Time taken to identify Blocks to scan: " + (System
- .currentTimeMillis() - startTime)
- )
- for (j <- 0 until result.size) {
- val cp = result.get(j).asInstanceOf[CarbonSparkPartition]
- logInfo(s"Node: " + cp.locations.toSeq.mkString(",")
- + ", No.Of Blocks: " + cp.tableBlockInfos.size
- )
+ logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
+ s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
+ logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
+ for (j <- 0 until result.size ) {
+ val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
+ val splitList = multiBlockSplit.getAllSplits
+ logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
+ s"${CarbonInputSplit.createBlocks(splitList).size}")
}
result.toArray(new Array[Partition](result.size))
}
[2/4] incubator-carbondata git commit: change ScanRdd to use
RecordReader
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7798e5c..e8d7399 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -17,49 +17,43 @@
package org.apache.carbondata.spark.rdd
+import java.text.SimpleDateFormat
import java.util
+import java.util.Date
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException}
+import org.apache.spark.mapred.CarbonHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo}
-import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.Distributable
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.scan.executor.QueryExecutor
-import org.apache.carbondata.scan.executor.QueryExecutorFactory
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
import org.apache.carbondata.scan.expression.Expression
-import org.apache.carbondata.scan.model.QueryModel
-import org.apache.carbondata.scan.result.BatchResult
-import org.apache.carbondata.scan.result.iterator.ChunkRowIterator
-import org.apache.carbondata.spark.RawValue
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
-
-class CarbonSparkPartition(rddId: Int, val idx: Int,
- val locations: Array[String],
- val tableBlockInfos: util.List[TableBlockInfo])
+class CarbonSparkPartition(
+ val rddId: Int,
+ val idx: Int,
+ @transient val multiBlockSplit: CarbonMultiBlockSplit)
extends Partition {
+ val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
+
override val index: Int = idx
- // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
- override def hashCode(): Int = {
- 41 * (41 + rddId) + idx
- }
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
}
/**
@@ -68,169 +62,135 @@ class CarbonSparkPartition(rddId: Int, val idx: Int,
* level filtering in driver side.
*/
class CarbonScanRDD[V: ClassTag](
- sc: SparkContext,
- queryModel: QueryModel,
+ @transient sc: SparkContext,
+ columnProjection: Seq[Attribute],
filterExpression: Expression,
- keyClass: RawValue[V],
- @transient conf: Configuration,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- baseStoreLocation: String)
- extends RDD[V](sc, Nil) {
+ identifier: AbsoluteTableIdentifier,
+ @transient carbonTable: CarbonTable)
+ extends RDD[V](sc, Nil)
+ with CarbonHadoopMapReduceUtil
+ with Logging {
+
+ private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+ @transient private val jobId = new JobID(jobTrackerId, id)
+ @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def getPartitions: Array[Partition] = {
- var defaultParallelism = sparkContext.defaultParallelism
- val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
- val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
- QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+ val job = Job.getInstance(new Configuration())
+ val format = prepareInputFormatForDriver(job.getConfiguration)
// initialise query_id for job
- job.getConfiguration.set("query.id", queryModel.getQueryId)
-
- val result = new util.ArrayList[Partition](defaultParallelism)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier)
- .getValidAndInvalidSegments
- // set filter resolver tree
- try {
- // before applying filter check whether segments are available in the table.
- if (!validAndInvalidSegments.getValidSegments.isEmpty) {
- val filterResolver = carbonInputFormat
- .getResolvedFilter(job.getConfiguration, filterExpression)
- CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
- queryModel.setFilterExpressionResolverTree(filterResolver)
- CarbonInputFormat
- .setSegmentsToAccess(job.getConfiguration,
- validAndInvalidSegments.getValidSegments
- )
- SegmentTaskIndexStore.getInstance()
- .removeTableBlocks(validAndInvalidSegments.getInvalidSegments,
- queryModel.getAbsoluteTableIdentifier
- )
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
- }
+ job.getConfiguration.set("query.id", queryId)
+
// get splits
- val splits = carbonInputFormat.getSplits(job)
+ val splits = format.getSplits(job)
+ val result = distributeSplits(splits)
+ result
+ }
+
+ private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = {
+ // this function distributes the split based on following logic:
+ // 1. based on data locality, to make split balanced on all available nodes
+ // 2. if the number of split for one
+
+ var statistic = new QueryStatistic()
+ val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
+ val parallelism = sparkContext.defaultParallelism
+ val result = new util.ArrayList[Partition](parallelism)
+ var noOfBlocks = 0
+ var noOfNodes = 0
+ var noOfTasks = 0
+
if (!splits.isEmpty) {
- val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
- queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments)
- val blockListTemp = carbonInputSplits.map(inputSplit =>
- new TableBlockInfo(inputSplit.getPath.toString,
- inputSplit.getStart, inputSplit.getSegmentId,
- inputSplit.getLocations, inputSplit.getLength,
- new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
- )
- )
- var activeNodes = Array[String]()
- if (blockListTemp.nonEmpty) {
- activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
- }
- defaultParallelism = sparkContext.defaultParallelism
- val blockList = CarbonLoaderUtil.
- distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
-
- if (blockList.nonEmpty) {
- var statistic = new QueryStatistic()
- // group blocks to nodes, tasks
- val nodeBlockMapping =
- CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
- activeNodes.toList.asJava
- )
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
- statistic = new QueryStatistic()
- var i = 0
- // Create Spark Partition for each task and assign blocks
- nodeBlockMapping.asScala.foreach { entry =>
- entry._2.asScala.foreach { blocksPerTask => {
- val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo])
- if (blocksPerTask.size() != 0) {
- result
- .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava))
- i += 1
- }
+ // create a list of block based on split
+ val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+ // get the list of executors and map blocks to executors based on locality
+ val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+ // divide the blocks among the tasks of the nodes as per the data locality
+ val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+ parallelism, activeNodes.toList.asJava)
+
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statistic = new QueryStatistic()
+
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+ blockList.asScala.foreach { blocksPerTask =>
+ val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ if (blocksPerTask.size() != 0) {
+ val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node)
+ val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+ result.add(partition)
+ i += 1
}
- }
- }
- val noOfBlocks = blockList.size
- val noOfNodes = nodeBlockMapping.size
- val noOfTasks = result.size()
- logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
- + s"parallelism: $defaultParallelism , " +
- s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
- )
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
- System.currentTimeMillis)
- statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId())
- statisticRecorder.logStatisticsAsTableDriver()
- result.asScala.foreach { r =>
- val cp = r.asInstanceOf[CarbonSparkPartition]
- logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" +
- s", No.Of Blocks: ${ cp.tableBlockInfos.size() }"
- )
}
- } else {
- logInfo("No blocks identified to scan")
}
- } else {
- logInfo("No valid segments found to scan")
+
+ noOfBlocks = splits.size
+ noOfNodes = nodeBlockMapping.size
+ noOfTasks = result.size()
+
+ statistic = new QueryStatistic()
+ statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
+ System.currentTimeMillis)
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId)
+ statisticRecorder.logStatisticsAsTableDriver()
}
+ logInfo(
+ s"""
+ | Identified no.of.blocks: $noOfBlocks,
+ | no.of.tasks: $noOfTasks,
+ | no.of.nodes: $noOfNodes,
+ | parallelism: $parallelism
+ """.stripMargin)
result.toArray(new Array[Partition](result.size()))
}
- override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[V] {
- var rowIterator: CarbonIterator[Array[Any]] = _
- var queryStartTime: Long = 0
- val queryExecutor = QueryExecutorFactory.getQueryExecutor()
- try {
- context.addTaskCompletionListener(context => {
- clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
- logStatistics()
- queryExecutor.finish
- })
- val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
- if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
- queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
- // fill table block info
- queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
- queryStartTime = System.currentTimeMillis
- val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
- logInfo("*************************" + carbonPropertiesFilePath)
- if (null == carbonPropertiesFilePath) {
- System.setProperty("carbon.properties.filepath",
- System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
- }
- // execute query
- rowIterator = new ChunkRowIterator(
- queryExecutor.execute(queryModel).
- asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+ override def compute(split: Partition, context: TaskContext): Iterator[V] = {
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
+ )
+ }
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- if (null != e.getMessage) {
- sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
- } else {
- sys.error("Exception occurred in query execution.Please check logs.")
- }
- }
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+ val attemptContext = newTaskAttemptContext(new Configuration(), attemptId)
+ val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
+ val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+ val reader = format.createRecordReader(inputSplit, attemptContext)
+ reader.initialize(inputSplit, attemptContext)
+
+ val queryStartTime = System.currentTimeMillis
- var havePair = false
- var finished = false
- var recordCount = 0
+ new Iterator[V] {
+ private var havePair = false
+ private var finished = false
+ private var count = 0
+
+ context.addTaskCompletionListener { context =>
+ logStatistics(queryStartTime, count)
+ reader.close()
+ }
override def hasNext: Boolean = {
+ if (context.isInterrupted) {
+ throw new TaskKilledException
+ }
if (!finished && !havePair) {
- finished = (null == rowIterator) || (!rowIterator.hasNext)
+ finished = !reader.nextKeyValue
+ if (finished) {
+ reader.close()
+ }
havePair = !finished
}
!finished
@@ -241,68 +201,55 @@ class CarbonScanRDD[V: ClassTag](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- recordCount += 1
- keyClass.getValue(rowIterator.next())
+ val value: V = reader.getCurrentValue
+ count += 1
+ value
}
+ }
+ }
- def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = {
- if (null != columnToDictionaryMap) {
- org.apache.carbondata.spark.util.CarbonQueryUtil
- .clearColumnDictionaryCache(columnToDictionaryMap)
- }
- }
+ private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonTable(conf, carbonTable)
+ createInputFormat(conf)
+ }
- def logStatistics(): Unit = {
- if (null != queryModel.getStatisticsRecorder) {
- var queryStatistic = new QueryStatistic()
- queryStatistic
- .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
- System.currentTimeMillis - queryStartTime
- )
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // result size
- queryStatistic = new QueryStatistic()
- queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
- queryModel.getStatisticsRecorder.recordStatistics(queryStatistic)
- // print executor query statistics for each task_id
- queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor()
- }
- }
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
+ CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf)
+ createInputFormat(conf)
+ }
+
+ private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = {
+ val format = new CarbonInputFormat[V]
+ CarbonInputFormat.setTablePath(conf, identifier.getTablePath)
+ CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ val projection = new CarbonProjection
+ columnProjection.foreach { attr =>
+ projection.addColumn(attr.name)
}
+ CarbonInputFormat.setColumnProjection(conf, projection)
+ format
+ }
- iter
+ def logStatistics(queryStartTime: Long, recordCount: Int): Unit = {
+ var queryStatistic = new QueryStatistic()
+ queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+ System.currentTimeMillis - queryStartTime)
+ val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // result size
+ queryStatistic = new QueryStatistic()
+ queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount)
+ statisticRecorder.recordStatistics(queryStatistic)
+ // print executor query statistics for each task_id
+ statisticRecorder.logStatisticsAsTableExecutor()
}
/**
* Get the preferred locations where to launch this task.
*/
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- val theSplit = partition.asInstanceOf[CarbonSparkPartition]
- val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
- val tableBlocks = theSplit.tableBlockInfos
- // node name and count mapping
- val blockMap = new util.LinkedHashMap[String, Integer]()
-
- tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
- location => {
- if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
- val currentCount = blockMap.get(location)
- if (currentCount == null) {
- blockMap.put(location, 1)
- } else {
- blockMap.put(location, currentCount + 1)
- }
- }
- }
- )
- )
-
- val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
- nodeCount1.getValue > nodeCount2.getValue
- }
- )
-
- val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
- firstOptionLocation ++ sortedNodesList
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val theSplit = split.asInstanceOf[CarbonSparkPartition]
+ val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost")
+ firstOptionLocation
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 9c9be8d..5fdbc5d 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -73,17 +73,8 @@ object Compactor {
maxSegmentColumnSchemaList = null
)
carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
- val segmentStatusManager = new SegmentStatusManager(new AbsoluteTableIdentifier
- (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId
- )
- )
- )
- carbonLoadModel.setLoadMetadataDetails(segmentStatusManager
- .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava
- )
+ carbonLoadModel.setLoadMetadataDetails(
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
deleted file mode 100644
index c55c807..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.hadoop.CarbonInputFormat
-
-
-/**
- * All the utility functions for carbon plan creation
- */
-object QueryPlanUtil {
-
- /**
- * createCarbonInputFormat from query model
- */
- def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) :
- (CarbonInputFormat[Array[Object]], Job) = {
- val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
- val jobConf: JobConf = new JobConf(new Configuration)
- val job: Job = new Job(jobConf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- (carbonInputFormat, job)
- }
-
- def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
- conf: Configuration) : CarbonInputFormat[V] = {
- val carbonInputFormat = new CarbonInputFormat[V]()
- val job: Job = new Job(conf)
- FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
- carbonInputFormat
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index c9d2a0f..ca0ad58 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -38,14 +38,13 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.scan.expression.logical.AndExpression
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-import org.apache.carbondata.spark.util.QueryPlanUtil
private[sql] case class CarbonDatasourceHadoopRelation(
@@ -104,7 +103,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
- CarbonInputFormat.setColumnProjection(projection, conf)
+ CarbonInputFormat.setColumnProjection(conf, projection)
CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf)
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
@@ -145,12 +144,11 @@ class CarbonHadoopFSRDD[V: ClassTag](
context: TaskContext): Iterator[V] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
- val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
- hadoopAttemptContext.getConfiguration
- )
+ val job: Job = new Job(hadoopAttemptContext.getConfiguration)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job)
hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
val reader =
- inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+ format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
hadoopAttemptContext
)
reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
@@ -186,11 +184,9 @@ class CarbonHadoopFSRDD[V: ClassTag](
override protected def getPartitions: Array[Partition] = {
val jobContext = newJobContext(conf.value, jobId)
- val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
- jobContext.getConfiguration
- )
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value))
jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath)
- val splits = carbonInputFormat.getSplits(jobContext).toArray
+ val splits = format.getSplits(jobContext).toArray
val carbonInputSplits = splits
.map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 069e106..a06d5cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -272,9 +272,8 @@ case class CarbonRelation(
private var sizeInBytesLocalValue = 0L
def sizeInBytes: Long = {
- val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
+ val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime(
tableMeta.carbonTable.getAbsoluteTableIdentifier)
- .getTableStatusLastModifiedTime
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
tableMeta.storePath,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
deleted file mode 100644
index c105cae..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.scan.model._
-import org.apache.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-
-case class CarbonScan(
- var attributesRaw: Seq[Attribute],
- relationRaw: CarbonRelation,
- dimensionPredicatesRaw: Seq[Expression],
- useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
- val carbonTable = relationRaw.metaData.carbonTable
- val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
- val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
- val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
- val unprocessedExprs = new ArrayBuffer[Expression]()
-
- val buildCarbonPlan: CarbonQueryPlan = {
- val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
-
- plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
- plan.setOutLocationPath(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
- plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
- processFilterExpressions(plan)
- plan
- }
-
- def processFilterExpressions(plan: CarbonQueryPlan) {
- if (dimensionPredicatesRaw.nonEmpty) {
- val expressionVal = CarbonFilters.processExpression(
- dimensionPredicatesRaw,
- attributesNeedToDecode,
- unprocessedExprs,
- carbonTable)
- expressionVal match {
- case Some(ce) =>
- // adding dimension used in expression in querystats
- plan.setFilterExpression(ce)
- case _ =>
- }
- }
- processExtraAttributes(plan)
- }
-
- private def processExtraAttributes(plan: CarbonQueryPlan) {
- if (attributesNeedToDecode.size() > 0) {
- val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
- attributesNeedToDecode.asScala.foreach { attr =>
- if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
- attributeOut += attr
- }
- }
- attributesRaw = attributeOut
- }
-
- val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- val dimAttr = new Array[Attribute](dimensions.size())
- val msrAttr = new Array[Attribute](measures.size())
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if(carbonDimension != null) {
- dimAttr(dimensions.indexOf(carbonDimension)) = attr
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if(carbonMeasure != null) {
- msrAttr(measures.indexOf(carbonMeasure)) = attr
- }
- }
- }
-
- attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
- var queryOrder: Integer = 0
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedDims += dim
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if (carbonMeasure != null) {
- val m1 = new QueryMeasure(attr.name)
- m1.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedMsrs += m1
- }
- }
- }
-
- // Fill the selected dimensions & measures obtained from
- // attributes to query plan for detailed query
- selectedDims.foreach(plan.addDimension)
- selectedMsrs.foreach(plan.addMeasure)
- }
-
-
- def inputRdd: CarbonScanRDD[Array[Any]] = {
-
- val conf = new Configuration()
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val model = QueryModel.createModel(
- absoluteTableIdentifier, buildCarbonPlan, carbonTable)
- val kv: RawValue[Array[Any]] = new RawValueImpl
- // setting queryid
- buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
- val tableCreationTime = carbonCatalog
- .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
- val schemaLastUpdatedTime = carbonCatalog
- .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
- val big = new CarbonScanRDD(
- ocRaw.sparkContext,
- model,
- buildCarbonPlan.getFilterExpression,
- kv,
- conf,
- tableCreationTime,
- schemaLastUpdatedTime,
- carbonCatalog.storePath)
- big
- }
-
-
- override def outputsUnsafeRows: Boolean =
- (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
-
- override def doExecute(): RDD[InternalRow] = {
- val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
- inputRdd.mapPartitions { iter =>
- val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
- new Iterator[InternalRow] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): InternalRow =
- if (outUnsafeRows) {
- unsafeProjection(new GenericMutableRow(iter.next()))
- } else {
- new GenericMutableRow(iter.next())
- }
- }
- }
- }
-
- def output: Seq[Attribute] = {
- attributesRaw
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
new file mode 100644
index 0000000..6580c4f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.LeafNode
+import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.scan.model._
+import org.apache.carbondata.spark.CarbonFilters
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+ var attributesRaw: Seq[Attribute],
+ relationRaw: CarbonRelation,
+ dimensionPredicatesRaw: Seq[Expression],
+ useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
+ val carbonTable = relationRaw.metaData.carbonTable
+ val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
+ val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
+ @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+
+ val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
+ val unprocessedExprs = new ArrayBuffer[Expression]()
+
+ val buildCarbonPlan: CarbonQueryPlan = {
+ val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName)
+
+ plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+ plan.setOutLocationPath(
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
+ plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+ processFilterExpressions(plan)
+ plan
+ }
+
+ def processFilterExpressions(plan: CarbonQueryPlan) {
+ if (dimensionPredicatesRaw.nonEmpty) {
+ val expressionVal = CarbonFilters.processExpression(
+ dimensionPredicatesRaw,
+ attributesNeedToDecode,
+ unprocessedExprs,
+ carbonTable)
+ expressionVal match {
+ case Some(ce) =>
+ // adding dimension used in expression in querystats
+ plan.setFilterExpression(ce)
+ case _ =>
+ }
+ }
+ processExtraAttributes(plan)
+ }
+
+ private def processExtraAttributes(plan: CarbonQueryPlan) {
+ if (attributesNeedToDecode.size() > 0) {
+ val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+ attributesNeedToDecode.asScala.foreach { attr =>
+ if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+ attributeOut += attr
+ }
+ }
+ attributesRaw = attributeOut
+ }
+
+ val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+ val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+ val dimAttr = new Array[Attribute](dimensions.size())
+ val msrAttr = new Array[Attribute](measures.size())
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if(carbonDimension != null) {
+ dimAttr(dimensions.indexOf(carbonDimension)) = attr
+ } else {
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if(carbonMeasure != null) {
+ msrAttr(measures.indexOf(carbonMeasure)) = attr
+ }
+ }
+ }
+
+ attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+
+ var queryOrder: Integer = 0
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null) {
+ val dim = new QueryDimension(attr.name)
+ dim.setQueryOrder(queryOrder)
+ queryOrder = queryOrder + 1
+ selectedDims += dim
+ } else {
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if (carbonMeasure != null) {
+ val m1 = new QueryMeasure(attr.name)
+ m1.setQueryOrder(queryOrder)
+ queryOrder = queryOrder + 1
+ selectedMsrs += m1
+ }
+ }
+ }
+
+ // Fill the selected dimensions & measures obtained from
+ // attributes to query plan for detailed query
+ selectedDims.foreach(plan.addDimension)
+ selectedMsrs.foreach(plan.addMeasure)
+ }
+
+ def inputRdd: CarbonScanRDD[Array[Any]] = {
+
+ val conf = new Configuration()
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+
+ // setting queryid
+ buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+ val tableCreationTime = carbonCatalog
+ .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+ val schemaLastUpdatedTime = carbonCatalog
+ .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+ new CarbonScanRDD(
+ ocRaw.sparkContext,
+ attributesRaw,
+ buildCarbonPlan.getFilterExpression,
+ absoluteTableIdentifier,
+ carbonTable
+ )
+ }
+
+ override def outputsUnsafeRows: Boolean =
+ (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+
+ override def doExecute(): RDD[InternalRow] = {
+ val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion
+ inputRdd.mapPartitions { iter =>
+ val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = iter.hasNext
+
+ override def next(): InternalRow = {
+ val value = iter.next
+ if (outUnsafeRows) {
+ unsafeProjection(new GenericMutableRow(value))
+ } else {
+ new GenericMutableRow(value)
+ }
+ }
+ }
+ }
+ }
+
+ def output: Seq[Attribute] = {
+ attributesRaw
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index a6b4ec5..74b0dd2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -924,10 +924,9 @@ private[sql] case class DeleteLoadsById(
}
val path = carbonTable.getMetaDataFilepath
- val segmentStatusManager =
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
try {
- val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
+ val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala
if (invalidLoadIds.isEmpty) {
@@ -986,8 +985,6 @@ private[sql] case class DeleteLoadsByLoadDate(
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
.getCarbonTable(dbName + '_' + tableName)
- val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
if (null == carbonTable) {
var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
.lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
@@ -995,8 +992,9 @@ private[sql] case class DeleteLoadsByLoadDate(
val path = carbonTable.getMetaDataFilepath()
try {
- val invalidLoadTimestamps = segmentStatusManager
- .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+ val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus(
+ carbonTable.getAbsoluteTableIdentifier, loadDate, path,
+ timeObj.asInstanceOf[java.lang.Long]).asScala
if (invalidLoadTimestamps.isEmpty) {
LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.")
}
@@ -1328,12 +1326,8 @@ private[sql] case class ShowLoads(
if (carbonTable == null) {
sys.error(s"$databaseName.$tableName is not found")
}
- val path = carbonTable.getMetaDataFilepath()
-
- val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
-
- val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(path)
-
+ val path = carbonTable.getMetaDataFilepath
+ val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
if (loadMetadataDetailsArray.nonEmpty) {
val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0c13293..25c36c5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.hive
-
import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
import scala.collection.JavaConverters._
@@ -44,12 +43,9 @@ object DistributionUtil {
* localhost for retriving executor list
*/
def getNodeList(sparkContext: SparkContext): Array[String] = {
-
- val arr =
- sparkContext.getExecutorMemoryStatus.map {
- kv =>
- kv._1.split(":")(0)
- }.toSeq
+ val arr = sparkContext.getExecutorMemoryStatus.map { kv =>
+ kv._1.split(":")(0)
+ }.toSeq
val localhostIPs = getLocalhostIPs
val selectedLocalIPList = localhostIPs.filter(arr.contains(_))
@@ -109,10 +105,9 @@ object DistributionUtil {
* @param sparkContext
* @return
*/
- def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
- sparkContext: SparkContext):
- Array[String] = {
- val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
+ sparkContext: SparkContext): Seq[String] = {
+ val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
var confExecutorsTemp: String = null
if (sparkContext.getConf.contains("spark.executor.instances")) {
confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -131,7 +126,9 @@ object DistributionUtil {
}
val requiredExecutors = if (nodeMapping.size > confExecutors) {
confExecutors
- } else { nodeMapping.size() }
+ } else {
+ nodeMapping.size()
+ }
val startTime = System.currentTimeMillis()
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index cc00c47..f02d4e7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -35,10 +35,9 @@ import org.apache.carbondata.core.util.CarbonProperties
class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
-
+ clean
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
.getCanonicalPath
-
sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -52,11 +51,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
}
+ def clean{
+ sql("drop table if exists Carbon_automation_test")
+ sql("drop table if exists Carbon_automation_hive")
+ sql("drop table if exists Carbon_automation_test_hive")
+ }
+
override def afterAll {
- sql("drop table Carbon_automation_test")
- sql("drop table Carbon_automation_hive")
- sql("drop table Carbon_automation_test_hive")
-
+ clean
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
@@ -425,10 +427,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
})
//TC_103
- test("select variance(deviceInformationId) as a from Carbon_automation_test")({
+ test("select variance(deviceInformationId) as a from carbon_automation_test")({
checkAnswer(
- sql("select variance(deviceInformationId) as a from Carbon_automation_test"),
- sql("select variance(deviceInformationId) as a from Carbon_automation_hive"))
+ sql("select variance(deviceInformationId) as a from carbon_automation_test"),
+ sql("select variance(deviceInformationId) as a from carbon_automation_hive"))
})
//TC_105
test("select var_samp(deviceInformationId) as a from Carbon_automation_test")({
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
index 7343a81..924d91a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala
@@ -113,26 +113,22 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll {
sql("clean files for table table2")
// check for table 1.
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier1 = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")
- )
- )
+ new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr"))
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier1)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0"))
assert(!segments.contains("1"))
// check for table 2.
- val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier2 = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")
- )
- )
+ new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1"))
// merged segment should not be there
- val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments2 = SegmentStatusManager.getSegmentStatus(identifier2)
+ .getValidSegments.asScala.toList
assert(segments2.contains("0.1"))
assert(!segments2.contains("0"))
assert(!segments2.contains("1"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 80a2320..f5039a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -43,9 +43,6 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter
val carbonTableIdentifier: CarbonTableIdentifier =
new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
-
override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index d780efe..f77ec9b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -86,13 +86,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
var noOfRetries = 0
while (status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
+ new CarbonTableIdentifier(
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index eb889d6..7ec6431 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -105,16 +105,11 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
* Compaction should fail as lock is being held purposefully
*/
test("check if compaction is failed or not.") {
-
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
- absoluteTableIdentifier
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
-
+ val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
assert(true)
- }
- else {
+ } else {
assert(false)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
index fbb39d8..15ed78b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
@@ -45,8 +45,7 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
val carbonTableIdentifier: CarbonTableIdentifier =
new CarbonTableIdentifier("default", "minorthreshold".toLowerCase(), "1")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
+ val identifier = new AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)
override def beforeAll {
CarbonProperties.getInstance()
@@ -96,7 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
sql("clean files for table minorthreshold")
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.2"))
assert(!segments.contains("0.1"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index c7be22f..570bb72 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -38,14 +38,10 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
// return segment details
def getSegments(databaseName : String, tableName : String, tableId : String): List[String] = {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)
- )
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
- segments
+ new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId))
+ SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList
}
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 3eef8b7..137cebc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -84,13 +84,12 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
var noOfRetries = 0
while (status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
if (!segments.contains("0.1")) {
// wait for 2 seconds for compaction to complete.
@@ -131,15 +130,14 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
// delete merged segments
sql("clean files for table normalcompaction")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(!segments.contains("0"))
assert(!segments.contains("1"))
assert(!segments.contains("2"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 99e3d56..a1664a6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -97,14 +97,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
var noOfRetries = 0
while (!status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -129,15 +128,14 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
// delete merged segments
sql("clean files for table ignoremajor")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(segments.contains("2.1"))
assert(!segments.contains("2"))
@@ -156,13 +154,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
catch {
case _:Throwable => assert(true)
}
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- )
val carbontablePath = CarbonStorePath
.getCarbonTablePath(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -170,7 +161,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
.getMetadataDirectoryPath
- var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+ val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted.
assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
@@ -185,13 +176,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
"DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" +
" '2222-01-01 19:35:01'"
)
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
- CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
- new CarbonTableIdentifier(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
- )
- )
val carbontablePath = CarbonStorePath
.getCarbonTablePath(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION),
@@ -199,7 +183,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
)
.getMetadataDirectoryPath
- var segs = segmentStatusManager.readLoadMetadata(carbontablePath)
+ val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
// status should remain as compacted for segment 2.
assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3745e11..25087a7 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -87,14 +87,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
var noOfRetries = 0
while (!status && noOfRetries < 10) {
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
)
- )
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
segments.foreach(seg =>
System.out.println( "valid segment is =" + seg)
)
@@ -119,14 +118,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
// delete merged segments
sql("clean files for table stopmajor")
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
- AbsoluteTableIdentifier(
+ val identifier = new AbsoluteTableIdentifier(
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
)
- )
// merged segment should not be there
- val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+ val segments = SegmentStatusManager.getSegmentStatus(identifier)
+ .getValidSegments.asScala.toList
assert(segments.contains("0.1"))
assert(!segments.contains("0.2"))
assert(!segments.contains("0"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5f6a56ca/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index bed6428..6d3cdec 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -55,7 +55,6 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
AbsoluteTableIdentifier(storeLocation,
new CarbonTableIdentifier(
CarbonCommonConstants.DATABASE_DEFAULT_NAME, "DataRetentionTable".toLowerCase(), "300"))
- val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention)
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath,
absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath
@@ -133,8 +132,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
}
test("RetentionTest_DeleteSegmentsByLoadTime") {
- val segments: Array[LoadMetadataDetails] = segmentStatusManager
- .readLoadMetadata(carbonTablePath)
+ val segments: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(carbonTablePath)
// check segment length, it should be 3 (loads)
if (segments.length != 2) {
assert(false)
[4/4] incubator-carbondata git commit: [CARBONDATA-308] Use
CarbonInputFormat in CarbonScanRDD compute This closes #262
Posted by ra...@apache.org.
[CARBONDATA-308] Use CarbonInputFormat in CarbonScanRDD compute This closes #262
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e05c0d5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e05c0d5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e05c0d5d
Branch: refs/heads/master
Commit: e05c0d5da95e503e532aca5833c99f2f51021dc4
Parents: 5c697e9 5f6a56c
Author: ravipesala <ra...@gmail.com>
Authored: Fri Nov 25 15:50:37 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 25 15:50:37 2016 +0530
----------------------------------------------------------------------
.../carbon/datastore/block/Distributable.java | 8 +-
.../carbon/datastore/block/TableBlockInfo.java | 3 +-
.../carbon/datastore/block/TableTaskInfo.java | 2 +-
.../scan/filter/FilterExpressionProcessor.java | 29 +-
.../examples/DataFrameAPIExample.scala | 5 +-
.../carbondata/hadoop/CarbonInputFormat.java | 334 +++++-----------
.../carbondata/hadoop/CarbonInputSplit.java | 112 +++++-
.../hadoop/CarbonMultiBlockSplit.java | 105 +++++
.../carbondata/hadoop/CarbonRecordReader.java | 34 +-
.../hadoop/readsupport/CarbonReadSupport.java | 2 +-
.../AbstractDictionaryDecodedReadSupport.java | 2 +-
.../impl/ArrayWritableReadSupport.java | 2 +-
.../readsupport/impl/RawDataReadSupport.java | 6 +-
.../hadoop/util/CarbonInputFormatUtil.java | 14 +
.../hadoop/ft/CarbonInputMapperTest.java | 15 +-
.../carbondata/spark/load/CarbonLoaderUtil.java | 99 +----
.../spark/merger/CarbonDataMergerUtil.java | 11 +-
.../readsupport/SparkRowReadSupportImpl.java | 9 +-
.../carbondata/spark/util/LoadMetadataUtil.java | 4 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 20 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 151 +++-----
.../carbondata/spark/rdd/CarbonScanRDD.scala | 387 ++++++++-----------
.../apache/carbondata/spark/rdd/Compactor.scala | 13 +-
.../carbondata/spark/util/QueryPlanUtil.scala | 56 ---
.../sql/CarbonDatasourceHadoopRelation.scala | 20 +-
.../spark/sql/CarbonDatasourceRelation.scala | 3 +-
.../org/apache/spark/sql/CarbonOperators.scala | 191 ---------
.../scala/org/apache/spark/sql/CarbonScan.scala | 186 +++++++++
.../execution/command/carbonTableSchema.scala | 20 +-
.../spark/sql/hive/DistributionUtil.scala | 21 +-
.../AllDataTypesTestCaseAggregate.scala | 20 +-
.../CompactionSystemLockFeatureTest.scala | 20 +-
.../DataCompactionBoundaryConditionsTest.scala | 3 -
.../DataCompactionCardinalityBoundryTest.scala | 10 +-
.../datacompaction/DataCompactionLockTest.scala | 11 +-
.../DataCompactionMinorThresholdTest.scala | 6 +-
.../DataCompactionNoDictionaryTest.scala | 10 +-
.../datacompaction/DataCompactionTest.scala | 14 +-
.../MajorCompactionIgnoreInMinorTest.scala | 32 +-
.../MajorCompactionStopsAfterCompaction.scala | 14 +-
.../dataretention/DataRetentionTestCase.scala | 5 +-
.../lcm/status/SegmentStatusManager.java | 160 ++++----
42 files changed, 964 insertions(+), 1205 deletions(-)
----------------------------------------------------------------------