You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 12:33:46 UTC
[47/50] [abbrv] carbondata git commit: [REBASE] Solve conflict after
merging master
[REBASE] Solve conflict after merging master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8104735f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8104735f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8104735f
Branch: refs/heads/carbonstore-rebase5
Commit: 8104735fd66952a531153eb0d3b4db5c9ecc133d
Parents: ce88eb6
Author: Jacky Li <ja...@qq.com>
Authored: Tue Feb 27 11:26:30 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Mar 4 20:32:14 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/dev/DataMap.java | 6 -
.../core/datamap/dev/DataMapFactory.java | 2 +-
.../exception/ConcurrentOperationException.java | 16 +-
.../core/metadata/PartitionMapFileStore.java | 0
.../statusmanager/SegmentStatusManager.java | 10 +-
.../SegmentUpdateStatusManager.java | 1 -
datamap/examples/pom.xml | 145 +++++++----------
.../datamap/examples/MinMaxDataWriter.java | 1 -
examples/flink/pom.xml | 4 +-
.../carbondata/examples/FlinkExample.scala | 10 +-
.../CarbonStreamSparkStreamingExample.scala | 1 -
.../hadoop/api/CarbonTableInputFormat.java | 5 +-
.../TestInsertAndOtherCommandConcurrent.scala | 2 +-
.../StandardPartitionGlobalSortTestCase.scala | 2 +-
.../exception/ProcessMetaDataException.java | 2 +
.../org/apache/carbondata/api/CarbonStore.scala | 6 +-
.../carbondata/spark/load/CsvRDDHelper.scala | 157 +++++++++++++++++++
.../load/DataLoadProcessBuilderOnSpark.scala | 3 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 2 +-
.../carbondata/spark/util/CommonUtil.scala | 2 -
.../command/carbonTableSchemaCommon.scala | 6 +-
.../CarbonAlterTableCompactionCommand.scala | 3 +-
.../management/CarbonCleanFilesCommand.scala | 2 +-
.../CarbonDeleteLoadByIdCommand.scala | 2 +-
.../CarbonDeleteLoadByLoadDateCommand.scala | 2 +-
.../management/CarbonLoadDataCommand.scala | 28 ++--
.../CarbonProjectForDeleteCommand.scala | 2 +-
.../CarbonProjectForUpdateCommand.scala | 2 +-
.../schema/CarbonAlterTableRenameCommand.scala | 2 +-
.../command/table/CarbonDropTableCommand.scala | 2 +-
.../datasources/CarbonFileFormat.scala | 3 -
.../vectorreader/AddColumnTestCases.scala | 1 +
.../datamap/DataMapWriterListener.java | 3 +-
.../loading/model/CarbonLoadModelBuilder.java | 34 +++-
.../processing/loading/model/LoadOption.java | 15 +-
.../processing/merger/CarbonDataMergerUtil.java | 3 +-
.../util/CarbonDataProcessorUtil.java | 3 +-
.../processing/util/CarbonLoaderUtil.java | 8 +
store/sdk/pom.xml | 2 +-
.../carbondata/sdk/file/CSVCarbonWriter.java | 8 +-
40 files changed, 336 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 02db8af..dd5507c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -38,9 +38,6 @@ public interface DataMap<T extends Blocklet> {
/**
* Prune the datamap with filter expression and partition information. It returns the list of
* blocklets where these filters can exist.
- *
- * @param filterExp
- * @return
*/
List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
List<PartitionSpec> partitions);
@@ -48,9 +45,6 @@ public interface DataMap<T extends Blocklet> {
// TODO Move this method to Abstract class
/**
* Validate whether the current segment needs to be fetching the required data
- *
- * @param filterExp
- * @return
*/
boolean isScanRequired(FilterResolverIntf filterExp);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 50ac279..d8a467f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.events.Event;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
index 7e717ba..918268c 100644
--- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
+++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
@@ -17,21 +17,10 @@
package org.apache.carbondata.core.exception;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-/**
- * This exception will be thrown when executing concurrent operations which
- * is not supported in carbon.
- *
- * For example, when INSERT OVERWRITE is executing, other operations are not
- * allowed, so this exception will be thrown
- */
-@InterfaceAudience.User
-@InterfaceStability.Stable
-public class ConcurrentOperationException extends Exception {
+public class ConcurrentOperationException extends MalformedCarbonCommandException {
public ConcurrentOperationException(String dbName, String tableName, String command1,
String command2) {
@@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception {
}
}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index b3b1240..d76158e 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
@@ -837,6 +838,13 @@ public class SegmentStatusManager {
public static void deleteLoadsAndUpdateMetadata(
CarbonTable carbonTable,
boolean isForceDeletion) throws IOException {
+ deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+ }
+
+ public static void deleteLoadsAndUpdateMetadata(
+ CarbonTable carbonTable,
+ boolean isForceDeletion,
+ List<PartitionSpec> partitionSpecs) throws IOException {
if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
@@ -881,7 +889,7 @@ public class SegmentStatusManager {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
if (updationCompletionStatus) {
DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
- identifier, carbonTable.getMetadataPath(), isForceDeletion);
+ identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 39eb262..a21873d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.TupleIdEnum;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/datamap/examples/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml
index 0049950..8539a86 100644
--- a/datamap/examples/pom.xml
+++ b/datamap/examples/pom.xml
@@ -15,97 +15,70 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-parent</artifactId>
- <version>1.4.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
- <artifactId>carbondata-datamap-examples</artifactId>
- <name>Apache CarbonData :: Datamap Examples</name>
+ <artifactId>carbondata-datamap-examples</artifactId>
+ <name>Apache CarbonData :: DataMap Examples</name>
- <properties>
- <dev.path>${basedir}/../../dev</dev.path>
- </properties>
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark2</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
- <build>
- <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
- <resources>
- <resource>
- <directory>.</directory>
- <includes>
- <include>CARBON_EXAMPLESLogResource.properties</include>
- </includes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory>
+ <resources>
+ <resource>
+ <directory>.</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index 5046182..17c8332 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/examples/flink/pom.xml
----------------------------------------------------------------------
diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml
index 75913cf..b783435 100644
--- a/examples/flink/pom.xml
+++ b/examples/flink/pom.xml
@@ -52,12 +52,12 @@
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark</artifactId>
+ <artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-examples-spark</artifactId>
+ <artifactId>carbondata-examples-spark2</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
----------------------------------------------------------------------
diff --git a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
index 9ce95ae..239a038 100644
--- a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
+++ b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
@@ -21,8 +21,8 @@ import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
-import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection}
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
// Write carbondata file by spark and read it by flink
// scalastyle:off println
@@ -30,7 +30,7 @@ object FlinkExample {
def main(args: Array[String]): Unit = {
// write carbondata file by spark
- val cc = ExampleUtils.createCarbonContext("FlinkExample")
+ val cc = ExampleUtils.createCarbonSession("FlinkExample")
val path = ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
// read two columns by flink
@@ -38,11 +38,11 @@ object FlinkExample {
projection.addColumn("c1") // column c1
projection.addColumn("c3") // column c3
val conf = new Configuration()
- CarbonInputFormat.setColumnProjection(conf, projection)
+ CarbonTableInputFormat.setColumnProjection(conf, projection)
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.readHadoopFile(
- new CarbonInputFormat[Array[Object]],
+ new CarbonTableInputFormat[Array[Object]],
classOf[Void],
classOf[Array[Object]],
path,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index 63b1c5a..856084b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.examples
import java.io.{File, PrintWriter}
import java.net.ServerSocket
-import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.CarbonSparkStreamingFactory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 5cebc12..f629d40 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,8 +33,8 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -498,7 +498,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
for (Segment segment : streamSegments) {
- String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+ String segmentDir = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), segment.getSegmentNo());
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index b39c44c..3f0ca42 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
// This testsuite test insert and insert overwrite with other commands concurrently
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index b511ee8..6e6be68 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll {
var executorService: ExecutorService = _
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
index 3e06bde..471b645 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.exception;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
// This exception will be thrown when processMetaData failed in
// Carbon's RunnableCommand
public class ProcessMetaDataException extends MalformedCarbonCommandException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b69ec37..bfb1616 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.util.DataLoadingUtil
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -139,9 +138,8 @@ object CarbonStore {
carbonCleanFilesLock =
CarbonLockUtil
.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+ carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
new file mode 100644
index 0000000..36d8c51
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
+import org.apache.carbondata.spark.util.CommonUtil
+
+object CsvRDDHelper {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * createsw a RDD that does reading of multiple CSV files
+ */
+ def csvFileScanRDD(
+ spark: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration
+ ): RDD[InternalRow] = {
+ // 1. partition
+ val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+ val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+ val defaultParallelism = spark.sparkContext.defaultParallelism
+ CommonUtil.configureCSVInputFormat(hadoopConf, model)
+ hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val jobContext = new JobContextImpl(jobConf, null)
+ val inputFormat = new CSVInputFormat()
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val splitFiles = rawSplits.map { split =>
+ val fileSplit = split.asInstanceOf[FileSplit]
+ PartitionedFile(
+ InternalRow.empty,
+ fileSplit.getPath.toString,
+ fileSplit.getStart,
+ fileSplit.getLength,
+ fileSplit.getLocations)
+ }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+
+ val partitions = new ArrayBuffer[FilePartition]
+ val currentFiles = new ArrayBuffer[PartitionedFile]
+ var currentSize = 0L
+
+ def closePartition(): Unit = {
+ if (currentFiles.nonEmpty) {
+ val newPartition =
+ FilePartition(
+ partitions.size,
+ currentFiles.toArray.toSeq)
+ partitions += newPartition
+ }
+ currentFiles.clear()
+ currentSize = 0
+ }
+
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
+ }
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles += file
+ }
+ closePartition()
+
+ // 2. read function
+ val serializableConfiguration = new SerializableConfiguration(jobConf)
+ val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
+ override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+ new Iterator[InternalRow] {
+ val hadoopConf = serializableConfiguration.value
+ val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ formatter.format(new Date())
+ }
+ val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val inputSplit =
+ new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
+ var finished = false
+ val inputFormat = new CSVInputFormat()
+ val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
+ reader.initialize(inputSplit, hadoopAttemptContext)
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (reader != null) {
+ if (reader.nextKeyValue()) {
+ true
+ } else {
+ finished = true
+ reader.close()
+ false
+ }
+ } else {
+ finished = true
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ override def next(): InternalRow = {
+ new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+ }
+ }
+ }
+ }
+ new FileScanRDD(spark, readFunction, partitions)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index e1bd84b..1062cd7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.util.DataLoadingUtil
/**
* Use sortBy operator in spark to load the data
@@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark {
} else {
// input data from files
val columnCount = model.getCsvHeaderColumns.length
- DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
+ CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..298c84e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
import org.apache.spark.sql.types._
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogService
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -45,7 +46,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
object CarbonScalaUtil {
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9104a32..d3093fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -816,8 +816,6 @@ object CommonUtil {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable, null)
} catch {
case _: Exception =>
LOGGER.warn(s"Error while cleaning table " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 71ce2c6..3c21af3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
-import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e47c500..2f4aa30 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
@@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.StreamHandoffRDD
import org.apache.carbondata.streaming.segment.StreamSegment
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..2092028 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 0861c63..81427a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
case class CarbonDeleteLoadByIdCommand(
loadIds: Seq[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dcbc6ce..1d76bda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
case class CarbonDeleteLoadByLoadDateCommand(
databaseNameOp: Option[String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 70134a6..eb00ebf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
@@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
-import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
+import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -193,12 +189,18 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+
+ val javaPartition = mutable.Map[String, String]()
+ partition.foreach { case (k, v) =>
+ if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+ }
+
new CarbonLoadModelBuilder(table).build(
options.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf,
- partition,
+ javaPartition.asJava,
dataFrame.isDefined)
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
@@ -231,11 +233,7 @@ case class CarbonLoadDataCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = false,
- table,
- currPartitions)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -679,7 +677,7 @@ case class CarbonLoadDataCommand(
}
}
val columnCount = carbonLoadModel.getCsvHeaderColumns.length
- val rdd = DataLoadingUtil.csvFileScanRDD(
+ val rdd = CsvRDDHelper.csvFileScanRDD(
sparkSession,
model = carbonLoadModel,
hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f074285..230378b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 5165342..2a92478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.storage.StorageLevel
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index cf77e0f..2503fc3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.util.AlterTableUtil
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8001a93..0298eea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException}
case class CarbonDropTableCommand(
ifExistsSet: Boolean,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 61a31a5..2eed988 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
class CarbonFileFormat
extends FileFormat
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 995f041..d94570a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 1104229..66f8bc5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,8 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<AbstractDataMapWriter> writers = registry.get(columns);
- AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+ AbstractDataMapWriter writer = factory.createWriter(
+ new Segment(segmentId, null), dataWritePath);
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 17e8dbe..29dfa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder {
Map<String, String> optionsFinal,
CarbonLoadModel carbonLoadModel,
Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+ build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false);
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+ * user provided load options
+ * @param partitions partition name map to path
+ * @param isDataFrame true if build for load for dataframe
+ */
+ public void build(
+ Map<String, String> options,
+ Map<String, String> optionsFinal,
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf,
+ Map<String, String> partitions,
+ boolean isDataFrame) throws InvalidLoadOptionException, IOException {
carbonLoadModel.setTableName(table.getTableName());
carbonLoadModel.setDatabaseName(table.getDatabaseName());
carbonLoadModel.setTablePath(table.getTablePath());
@@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder {
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
carbonLoadModel.setCsvHeader(fileHeader);
carbonLoadModel.setColDictFilePath(column_dict);
+
+ List<String> ignoreColumns = new ArrayList<>();
+ if (!isDataFrame) {
+ for (Map.Entry<String, String> partition : partitions.entrySet()) {
+ if (partition.getValue() != null) {
+ ignoreColumns.add(partition.getKey());
+ }
+ }
+ }
+
carbonLoadModel.setCsvHeaderColumns(
- LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns));
int validatedMaxColumns = validateMaxColumns(
carbonLoadModel.getCsvHeaderColumns(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 5af4859..bac1a94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.Maps;
@@ -201,6 +203,16 @@ public class LoadOption {
public static String[] getCsvHeaderColumns(
CarbonLoadModel carbonLoadModel,
Configuration hadoopConf) throws IOException {
+ return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>());
+ }
+
+ /**
+ * Return CSV header field names, with partition column
+ */
+ public static String[] getCsvHeaderColumns(
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf,
+ List<String> staticPartitionCols) throws IOException {
String delimiter;
if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
delimiter = CarbonCommonConstants.COMMA;
@@ -231,7 +243,7 @@ public class LoadOption {
}
if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
- carbonLoadModel.getCarbonDataLoadSchema())) {
+ carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
if (csvFile == null) {
LOG.error("CSV header in DDL is not proper."
+ " Column names in schema and CSV header are not the same.");
@@ -249,4 +261,5 @@ public class LoadOption {
}
return csvColumns;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 1ab803b..438e7db 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1013,7 +1013,8 @@ public final class CarbonDataMergerUtil {
CarbonFile[] updateDeltaFiles = null;
Set<String> uniqueBlocks = new HashSet<String>();
- String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg.getSegmentNo());
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), seg.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index efd715c..ba2b0c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -392,7 +392,8 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
+ public static String createCarbonStoreLocation(String databaseName, String tableName,
+ String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 38e5698..a948538 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,9 +16,13 @@
*/
package org.apache.carbondata.processing.util;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.util.*;
import org.apache.carbondata.common.logging.LogService;
@@ -36,6 +40,9 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -55,6 +62,7 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
import static org.apache.carbondata.core.enums.EscapeSequences.*;
+import com.google.gson.Gson;
public final class CarbonLoaderUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 54fba55..b3dd464 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -7,7 +7,7 @@
<parent>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-parent</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>1.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8104735f/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index dc5696a..df6afc6 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -23,7 +23,7 @@ import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +42,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@InterfaceAudience.Internal
class CSVCarbonWriter extends CarbonWriter {
- private RecordWriter<NullWritable, StringArrayWritable> recordWriter;
+ private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
private TaskAttemptContext context;
- private StringArrayWritable writable;
+ private ObjectArrayWritable writable;
CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
Configuration hadoopConf = new Configuration();
@@ -57,7 +57,7 @@ class CSVCarbonWriter extends CarbonWriter {
TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID);
this.recordWriter = format.getRecordWriter(context);
this.context = context;
- this.writable = new StringArrayWritable();
+ this.writable = new ObjectArrayWritable();
}
/**