You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:02:11 UTC
[50/50] [abbrv] carbondata git commit: [REBASE] Solve conflict after
merging master
[REBASE] Solve conflict after merging master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7540cc9c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7540cc9c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7540cc9c
Branch: refs/heads/carbonstore-rebase5
Commit: 7540cc9cab404c06eaf5c92da77ef7e36f9182b5
Parents: 8a9dd8b
Author: Jacky Li <ja...@qq.com>
Authored: Tue Feb 27 11:26:30 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 2 16:00:28 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/dev/DataMap.java | 9 +-
.../core/datamap/dev/DataMapFactory.java | 2 +-
.../exception/ConcurrentOperationException.java | 16 +-
.../core/indexstore/BlockletDetailsFetcher.java | 3 +-
.../blockletindex/BlockletDataMap.java | 3 +-
.../blockletindex/SegmentIndexFileStore.java | 2 -
.../core/metadata/PartitionMapFileStore.java | 0
.../scan/executor/util/RestructureUtil.java | 6 +-
.../statusmanager/SegmentStatusManager.java | 10 +-
.../SegmentUpdateStatusManager.java | 7 +-
.../CarbonStreamSparkStreamingExample.scala | 14 +-
...CarbonStructuredStreamingWithRowParser.scala | 8 +-
.../hadoop/api/CarbonTableInputFormat.java | 5 +-
.../preaggregate/TestPreAggCreateCommand.scala | 2 +-
.../TestInsertAndOtherCommandConcurrent.scala | 2 +-
.../StandardPartitionGlobalSortTestCase.scala | 2 +-
.../exception/ProcessMetaDataException.java | 2 +
.../org/apache/carbondata/api/CarbonStore.scala | 6 +-
.../carbondata/spark/load/CsvRDDHelper.scala | 157 +++++++++++++++++++
.../load/DataLoadProcessBuilderOnSpark.scala | 3 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 2 -
.../command/carbonTableSchemaCommon.scala | 6 +-
.../CarbonAlterTableCompactionCommand.scala | 3 +-
.../management/CarbonCleanFilesCommand.scala | 2 +-
.../CarbonDeleteLoadByIdCommand.scala | 2 +-
.../CarbonDeleteLoadByLoadDateCommand.scala | 2 +-
.../management/CarbonLoadDataCommand.scala | 28 ++--
.../CarbonProjectForDeleteCommand.scala | 2 +-
.../CarbonProjectForUpdateCommand.scala | 2 +-
.../schema/CarbonAlterTableRenameCommand.scala | 2 +-
.../command/table/CarbonDropTableCommand.scala | 2 +-
.../datasources/CarbonFileFormat.scala | 3 -
.../TestStreamingTableWithRowParser.scala | 9 +-
.../vectorreader/AddColumnTestCases.scala | 1 +
.../datamap/DataMapWriterListener.java | 3 +-
.../loading/model/CarbonLoadModelBuilder.java | 34 +++-
.../processing/loading/model/LoadOption.java | 15 +-
.../processing/merger/CarbonDataMergerUtil.java | 19 ++-
.../merger/CompactionResultSortProcessor.java | 4 +-
.../merger/RowResultMergerProcessor.java | 4 +-
.../partition/spliter/RowResultProcessor.java | 4 +-
.../util/CarbonDataProcessorUtil.java | 3 +-
store/sdk/pom.xml | 2 +-
.../carbondata/sdk/file/CSVCarbonWriter.java | 8 +-
45 files changed, 305 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 4a68286..fdeacff 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -38,18 +38,13 @@ public interface DataMap<T extends Blocklet> {
/**
* Prune the datamap with filter expression and partition information. It returns the list of
* blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
*/
- List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions);
+ List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+ List<PartitionSpec> partitions);
// TODO Move this method to Abstract class
/**
* Validate whether the current segment needs to be fetching the required data
- *
- * @param filterExp
- * @return
*/
boolean isScanRequired(FilterResolverIntf filterExp);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 50ac279..d8a467f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.events.Event;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
index 7e717ba..918268c 100644
--- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
+++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
@@ -17,21 +17,10 @@
package org.apache.carbondata.core.exception;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-/**
- * This exception will be thrown when executing concurrent operations which
- * is not supported in carbon.
- *
- * For example, when INSERT OVERWRITE is executing, other operations are not
- * allowed, so this exception will be thrown
- */
-@InterfaceAudience.User
-@InterfaceStability.Stable
-public class ConcurrentOperationException extends Exception {
+public class ConcurrentOperationException extends MalformedCarbonCommandException {
public ConcurrentOperationException(String dbName, String tableName, String command1,
String command2) {
@@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception {
}
}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index dd592c0..58c11db 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -53,5 +53,6 @@ public interface BlockletDetailsFetcher {
* @param segment
* @return
*/
- List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException;
+ List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+ throws IOException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index ce6193b..b379ae3 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -660,7 +660,8 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
}
@Override
- public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
+ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+ List<PartitionSpec> partitions) {
if (unsafeMemoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 537e124..00d03a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 2712cbc..e67d822 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -80,7 +80,7 @@ public class RestructureUtil {
if (queryDimension.getDimension().hasEncoding(Encoding.IMPLICIT)) {
presentDimension.add(queryDimension);
isDimensionExists[dimIndex] = true;
- dimensionInfo.dataType[queryDimension.getQueryOrder()] =
+ dimensionInfo.dataType[queryDimension.getOrdinal()] =
queryDimension.getDimension().getDataType();
} else {
for (CarbonDimension tableDimension : tableBlockDimensions) {
@@ -95,7 +95,7 @@ public class RestructureUtil {
currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
presentDimension.add(currentBlockDimension);
isDimensionExists[dimIndex] = true;
- dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+ dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
currentBlockDimension.getDimension().getDataType();
break;
}
@@ -113,7 +113,7 @@ public class RestructureUtil {
currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
presentDimension.add(currentBlockDimension);
isDimensionExists[dimIndex] = true;
- dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+ dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
currentBlockDimension.getDimension().getDataType();
break;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 89666ab..1bb4b03 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
@@ -838,6 +839,13 @@ public class SegmentStatusManager {
public static void deleteLoadsAndUpdateMetadata(
CarbonTable carbonTable,
boolean isForceDeletion) throws IOException {
+ deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+ }
+
+ public static void deleteLoadsAndUpdateMetadata(
+ CarbonTable carbonTable,
+ boolean isForceDeletion,
+ List<PartitionSpec> partitionSpecs) throws IOException {
if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
@@ -882,7 +890,7 @@ public class SegmentStatusManager {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
if (updationCompletionStatus) {
DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
- identifier, carbonTable.getMetadataPath(), isForceDeletion);
+ identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6ec6fa2..4a2149e 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -77,8 +77,8 @@ public class SegmentUpdateStatusManager {
this.identifier = identifier;
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
- segmentDetails =
- SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(identifier.getTablePath()));
+ segmentDetails = SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
if (segmentDetails.length > 0) {
isPartitionTable = segmentDetails[0].getSegmentFile() != null;
}
@@ -259,7 +259,8 @@ public class SegmentUpdateStatusManager {
+ CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
.replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
} else {
- String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
+ String carbonDataDirectoryPath =
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
blockPath =
carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index f59a610..d3784e7 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -20,18 +20,14 @@ package org.apache.carbondata.examples
import java.io.{File, PrintWriter}
import java.net.ServerSocket
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.CarbonSparkStreamingListener
import org.apache.carbondata.streaming.parser.CarbonStreamParser
@@ -77,7 +73,7 @@ object CarbonStreamSparkStreamingExample {
| 'dictionary_include'='city')
| """.stripMargin)
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val tablePath = carbonTable.getTablePath
// batch load
val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
spark.sql(
@@ -91,7 +87,7 @@ object CarbonStreamSparkStreamingExample {
val serverSocket = new ServerSocket(7071)
val thread1 = writeSocket(serverSocket)
val thread2 = showTableCount(spark, streamTableName)
- val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath)
+ val ssc = startStreaming(spark, streamTableName, checkpointPath)
// add a Spark Streaming Listener to remove all lock for stream tables when stop app
ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
// wait for stop signal to stop Spark Streaming App
@@ -155,8 +151,8 @@ object CarbonStreamSparkStreamingExample {
thread
}
- def startStreaming(spark: SparkSession, tableName: String,
- tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+ def startStreaming(
+ spark: SparkSession, tableName: String, checkpointPath: String): StreamingContext = {
var ssc: StreamingContext = null
try {
// recommend: the batch interval must set larger, such as 30s, 1min.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index cce833b..61c45d3 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -23,7 +23,7 @@ import java.net.ServerSocket
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
case class FileElement(school: Array[String], age: Int)
@@ -77,7 +77,7 @@ object CarbonStructuredStreamingWithRowParser {
}
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val tablePath = carbonTable.getTablePath
// batch load
val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
spark.sql(
@@ -140,7 +140,7 @@ object CarbonStructuredStreamingWithRowParser {
thread
}
- def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread = {
+ def startStreaming(spark: SparkSession, tablePath: String): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
@@ -167,7 +167,7 @@ object CarbonStructuredStreamingWithRowParser {
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
- .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "stream_table_with_row_parser")
.option(CarbonStreamParser.CARBON_STREAM_PARSER,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b485b69..5184e07 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,8 +33,8 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -494,7 +494,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
for (Segment segment : streamSegments) {
- String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+ String segmentDir = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), segment.getSegmentNo());
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 8f63af6..e09b922 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index b39c44c..3f0ca42 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
// This testsuite test insert and insert overwrite with other commands concurrently
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 7d0959c..629417f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll {
var executorService: ExecutorService = _
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
index 3e06bde..471b645 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.exception;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
// This exception will be thrown when processMetaData failed in
// Carbon's RunnableCommand
public class ProcessMetaDataException extends MalformedCarbonCommandException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b69ec37..bfb1616 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.util.DataLoadingUtil
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -139,9 +138,8 @@ object CarbonStore {
carbonCleanFilesLock =
CarbonLockUtil
.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+ carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
new file mode 100644
index 0000000..36d8c51
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
+import org.apache.carbondata.spark.util.CommonUtil
+
+object CsvRDDHelper {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * createsw a RDD that does reading of multiple CSV files
+ */
+ def csvFileScanRDD(
+ spark: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration
+ ): RDD[InternalRow] = {
+ // 1. partition
+ val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+ val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+ val defaultParallelism = spark.sparkContext.defaultParallelism
+ CommonUtil.configureCSVInputFormat(hadoopConf, model)
+ hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val jobContext = new JobContextImpl(jobConf, null)
+ val inputFormat = new CSVInputFormat()
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val splitFiles = rawSplits.map { split =>
+ val fileSplit = split.asInstanceOf[FileSplit]
+ PartitionedFile(
+ InternalRow.empty,
+ fileSplit.getPath.toString,
+ fileSplit.getStart,
+ fileSplit.getLength,
+ fileSplit.getLocations)
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val partitions = new ArrayBuffer[FilePartition]
+ val currentFiles = new ArrayBuffer[PartitionedFile]
+ var currentSize = 0L
+
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ val newPartition =
+ FilePartition(
+ partitions.size,
+ currentFiles.toArray.toSeq)
+ partitions += newPartition
+ }
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
+ }
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles += file
+ }
+ closePartition()
+
+ // 2. read function
+ val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+ override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ val hadoopConf = serializableConfiguration.value
+ val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ formatter.format(new Date())
+ }
+ val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val inputSplit =
+ new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+ var finished = false
+ val inputFormat = new CSVInputFormat()
+ val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+ reader.initialize(inputSplit, hadoopAttemptContext)
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (reader != null) {
+ if (reader.nextKeyValue()) {
+ true
+ } else {
+ finished = true
+ reader.close()
+ false
+ }
+ } else {
+ finished = true
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+ }
+ }
+ }
+ }
+ new FileScanRDD(spark, readFunction, partitions)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index e1bd84b..1062cd7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.util.DataLoadingUtil
/**
* Use sortBy operator in spark to load the data
@@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark {
} else {
// input data from files
val columnCount = model.getCsvHeaderColumns.length
- DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
+ CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..298c84e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
import org.apache.spark.sql.types._
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogService
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -45,7 +46,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
object CarbonScalaUtil {
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9104a32..d3093fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -816,8 +816,6 @@ object CommonUtil {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable, null)
} catch {
case _: Exception =>
LOGGER.warn(s"Error while cleaning table " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 71ce2c6..3c21af3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
-import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e47c500..2f4aa30 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
@@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.StreamHandoffRDD
import org.apache.carbondata.streaming.segment.StreamSegment
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..2092028 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 0861c63..81427a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
case class CarbonDeleteLoadByIdCommand(
loadIds: Seq[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dcbc6ce..1d76bda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
case class CarbonDeleteLoadByLoadDateCommand(
databaseNameOp: Option[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4806a6f..34a464a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
@@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
-import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
+import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -193,12 +189,18 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+
+ val javaPartition = mutable.Map[String, String]()
+ partition.foreach { case (k, v) =>
+ if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+ }
+
new CarbonLoadModelBuilder(table).build(
options.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf,
- partition,
+ javaPartition.asJava,
dataFrame.isDefined)
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
@@ -231,11 +233,7 @@ case class CarbonLoadDataCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = false,
- table,
- currPartitions)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -672,7 +670,7 @@ case class CarbonLoadDataCommand(
}
}
val columnCount = carbonLoadModel.getCsvHeaderColumns.length
- val rdd = DataLoadingUtil.csvFileScanRDD(
+ val rdd = CsvRDDHelper.csvFileScanRDD(
sparkSession,
model = carbonLoadModel,
hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f074285..230378b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 5165342..2a92478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.storage.StorageLevel
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 5f8eb12..474f9c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -23,12 +23,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.util.AlterTableUtil
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8001a93..0298eea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException}
case class CarbonDropTableCommand(
ifExistsSet: Boolean,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 61a31a5..2eed988 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
class CarbonFileFormat
extends FileFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 3e3b2c5..064ff28 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.streaming.parser.CarbonStreamParser
case class FileElement(school: Array[String], age: Integer)
@@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
def createSocketStreamingThread(
spark: SparkSession,
port: Int,
- tablePath: CarbonTablePath,
+ tablePath: String,
tableIdentifier: TableIdentifier,
badRecordAction: String = "force",
intervalSecond: Int = 2,
@@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime(s"$intervalSecond seconds"))
- .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("bad_records_action", badRecordAction)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
@@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
val identifier = new TableIdentifier(tableName, Option("streaming1"))
val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
.asInstanceOf[CarbonRelation].metaData.carbonTable
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
var server: ServerSocket = null
try {
server = getServerSocket()
@@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
val thread2 = createSocketStreamingThread(
spark = spark,
port = server.getLocalPort,
- tablePath = tablePath,
+ tablePath = carbonTable.getTablePath,
tableIdentifier = identifier,
badRecordAction = badRecordAction,
intervalSecond = intervalOfIngest,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 995f041..d94570a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 1104229..66f8bc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,8 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<AbstractDataMapWriter> writers = registry.get(columns);
- AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+ AbstractDataMapWriter writer = factory.createWriter(
+ new Segment(segmentId, null), dataWritePath);
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 17e8dbe..29dfa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder {
Map<String, String> optionsFinal,
CarbonLoadModel carbonLoadModel,
Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+ build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+ * user provided load options
+ * @param partitions partition name map to path
+ * @param isDataFrame true if build for load for dataframe
+ */
+ public void build(
+ Map<String, String> options,
+ Map<String, String> optionsFinal,
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf,
+ Map<String, String> partitions,
+ boolean isDataFrame) throws InvalidLoadOptionException, IOException {
carbonLoadModel.setTableName(table.getTableName());
carbonLoadModel.setDatabaseName(table.getDatabaseName());
carbonLoadModel.setTablePath(table.getTablePath());
@@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder {
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
carbonLoadModel.setCsvHeader(fileHeader);
carbonLoadModel.setColDictFilePath(column_dict);
+
+ List<String> ignoreColumns = new ArrayList<>();
+ if (!isDataFrame) {
+ for (Map.Entry<String, String> partition : partitions.entrySet()) {
+ if (partition.getValue() != null) {
+ ignoreColumns.add(partition.getKey());
+ }
+ }
+ }
+
carbonLoadModel.setCsvHeaderColumns(
- LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
int validatedMaxColumns = validateMaxColumns(
carbonLoadModel.getCsvHeaderColumns(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 5af4859..bac1a94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.Maps;
@@ -201,6 +203,16 @@ public class LoadOption {
public static String[] getCsvHeaderColumns(
CarbonLoadModel carbonLoadModel,
Configuration hadoopConf) throws IOException {
+ return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>());
+ }
+
+ /**
+ * Return CSV header field names, with partition column
+ */
+ public static String[] getCsvHeaderColumns(
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf,
+ List<String> staticPartitionCols) throws IOException {
String delimiter;
if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
delimiter = CarbonCommonConstants.COMMA;
@@ -231,7 +243,7 @@ public class LoadOption {
}
if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
- carbonLoadModel.getCarbonDataLoadSchema())) {
+ carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
if (csvFile == null) {
LOG.error("CSV header in DDL is not proper."
+ " Column names in schema and CSV header are not the same.");
@@ -249,4 +261,5 @@ public class LoadOption {
}
return csvColumns;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index d2faef5..142b2cb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -21,7 +21,16 @@ import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -32,7 +41,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -592,10 +600,6 @@ public final class CarbonDataMergerUtil {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
- CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier();
-
-
// total length
long totalLength = 0;
@@ -1013,7 +1017,8 @@ public final class CarbonDataMergerUtil {
CarbonFile[] updateDeltaFiles = null;
Set<String> uniqueBlocks = new HashSet<String>();
- String segmentPath = CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), seg.getSegmentNo());
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ absoluteTableIdentifier.getTablePath(), seg.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index ea11e22..ebcf944 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -405,8 +405,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
- carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId());
+ carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 278d5bb..2616def 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -76,8 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
.getFactTimeStamp() + ".tmp";
} else {
- carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+ carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index df2e2a2..221697f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,8 +47,8 @@ public class RowResultProcessor {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
- String carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+ String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+ loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index dc8ffd7..19ad47d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -370,7 +370,8 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
+ public static String createCarbonStoreLocation(String databaseName, String tableName,
+ String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 54fba55..b3dd464 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-parent</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>1.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index dc5696a..df6afc6 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -23,7 +23,7 @@ import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +42,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@InterfaceAudience.Internal
class CSVCarbonWriter extends CarbonWriter {
- private RecordWriter<NullWritable, StringArrayWritable> recordWriter;
+ private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
private TaskAttemptContext context;
- private StringArrayWritable writable;
+ private ObjectArrayWritable writable;
CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
Configuration hadoopConf = new Configuration();
@@ -57,7 +57,7 @@ class CSVCarbonWriter extends CarbonWriter {
TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID);
this.recordWriter = format.getRecordWriter(context);
this.context = context;
- this.writable = new StringArrayWritable();
+ this.writable = new ObjectArrayWritable();
}
/**