You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:26 UTC
[4/5] incubator-carbondata git commit: Data load integration of all
steps for removing kettle
Data load integration of all steps for removing kettle
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/496cde46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/496cde46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/496cde46
Branch: refs/heads/master
Commit: 496cde46ddd43c14a07172a913550f6fd47bd7c5
Parents: ae9d88b
Author: ravipesala <ra...@gmail.com>
Authored: Wed Nov 9 22:38:47 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 9 22:38:47 2016 +0530
----------------------------------------------------------------------
.../core/carbon/CarbonTableIdentifier.java | 8 +
.../store/dataholder/CarbonWriteDataHolder.java | 42 ++
.../TimeStampDirectDictionaryGenerator.java | 7 +-
.../carbondata/core/load/BlockDetails.java | 31 +-
.../carbondata/hadoop/csv/CSVInputFormat.java | 54 ++
.../recorditerator/RecordReaderIterator.java | 68 ++
.../hadoop/test/util/StoreCreator.java | 4 +-
.../sql/common/util/CarbonHiveContext.scala | 3 +-
integration/spark/pom.xml | 1 +
.../spark/merger/RowResultMerger.java | 2 +-
.../carbondata/spark/load/CarbonLoadModel.java | 645 ------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 1 +
.../spark/load/DeleteLoadFolders.java | 1 +
.../spark/merger/CarbonDataMergerUtil.java | 2 +-
.../carbondata/spark/util/LoadMetadataUtil.java | 2 +-
.../spark/CarbonDataFrameWriter.scala | 3 +-
.../spark/rdd/CarbonDataLoadRDD.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 58 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 3 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 272 ++++++++
.../spark/util/GlobalDictionaryUtil.scala | 2 +-
.../apache/spark/mapred/SparkMapRedUtil.scala | 32 +
.../org/apache/spark/sql/CarbonSqlParser.scala | 3 +-
.../execution/command/carbonTableSchema.scala | 59 +-
.../spark/sql/hive/CarbonStrategies.scala | 4 +-
.../org/apache/spark/util/SplitUtils.scala | 2 +-
.../TestLoadDataWithNotProperInputFile.scala | 2 +-
.../spark/util/AllDictionaryTestCase.scala | 4 +-
.../AutoHighCardinalityIdentifyTestCase.scala | 3 +-
.../util/ExternalColumnDictionaryTestCase.scala | 3 +-
...GlobalDictionaryUtilConcurrentTestCase.scala | 3 +-
.../util/GlobalDictionaryUtilTestCase.scala | 4 +-
.../sql/common/util/CarbonHiveContext.scala | 3 +-
pom.xml | 7 +
.../processing/csvload/DataGraphExecuter.java | 45 +-
.../processing/csvload/GraphExecutionUtil.java | 61 --
.../processing/datatypes/ArrayDataType.java | 27 +-
.../processing/datatypes/GenericDataType.java | 18 +-
.../processing/datatypes/PrimitiveDataType.java | 73 ++-
.../processing/datatypes/StructDataType.java | 37 +-
.../processing/mdkeygen/MDKeyGenStep.java | 2 +-
.../processing/model/CarbonLoadModel.java | 647 +++++++++++++++++++
.../processing/newflow/DataField.java | 17 +-
.../processing/newflow/DataLoadExecutor.java | 72 +++
.../newflow/DataLoadProcessBuilder.java | 167 +++++
.../constants/DataLoadProcessorConstants.java | 8 +
.../newflow/converter/BadRecordLogHolder.java | 46 ++
.../newflow/converter/FieldConverter.java | 4 +-
.../newflow/converter/RowConverter.java | 4 +
.../AbstractDictionaryFieldConverterImpl.java | 4 +-
.../impl/ComplexFieldConverterImpl.java | 36 +-
.../impl/DictionaryFieldConverterImpl.java | 31 +-
.../DirectDictionaryFieldConverterImpl.java | 51 +-
.../converter/impl/FieldEncoderFactory.java | 86 ++-
.../impl/MeasureFieldConverterImpl.java | 83 +++
.../impl/NonDictionaryFieldConverterImpl.java | 20 +-
.../converter/impl/RowConverterImpl.java | 50 +-
.../newflow/dictionary/DirectDictionary.java | 59 ++
.../dictionary/PreCreatedDictionary.java | 8 +-
.../exception/BadRecordFoundException.java | 67 ++
.../newflow/parser/CarbonParserFactory.java | 25 +-
.../newflow/parser/GenericParser.java | 4 +-
.../newflow/parser/impl/ArrayParserImpl.java | 33 +-
.../parser/impl/PrimitiveParserImpl.java | 2 +-
.../newflow/parser/impl/RowParserImpl.java | 68 +-
.../newflow/parser/impl/StructParserImpl.java | 27 +-
.../processing/newflow/row/CarbonRow.java | 18 +-
.../sort/impl/ParallelReadMergeSorterImpl.java | 28 +-
.../sort/impl/SortPreparatorIterator.java | 147 -----
.../steps/DataConverterProcessorStepImpl.java | 106 ++-
.../steps/DataWriterProcessorStepImpl.java | 216 +++++++
.../newflow/steps/DummyClassForTest.java | 84 +++
.../newflow/steps/InputProcessorStepImpl.java | 47 +-
.../newflow/steps/SortProcessorStepImpl.java | 1 +
.../writer/DataWriterProcessorStepImpl.java | 222 -------
.../sortdata/IntermediateFileMerger.java | 90 ++-
.../sortdata/NewRowComparator.java | 73 +++
.../sortdata/NewRowComparatorForNormalDims.java | 61 ++
.../sortandgroupby/sortdata/SortDataRows.java | 112 +++-
.../sortandgroupby/sortdata/SortParameters.java | 44 +-
.../sortdata/SortTempFileChunkHolder.java | 119 +++-
.../store/CarbonFactDataHandlerColumnar.java | 309 ++++++++-
.../store/CarbonFactDataHandlerModel.java | 40 +-
.../store/SingleThreadFinalSortFilesMerger.java | 8 +-
.../csvbased/BadRecordsLogger.java | 249 +++++++
.../csvbased/BadRecordslogger.java | 235 -------
.../csvbased/CarbonCSVBasedSeqGenStep.java | 57 +-
.../util/CarbonDataProcessorUtil.java | 226 ++++++-
.../processing/util/RemoveDictionaryUtil.java | 9 +
.../carbondata/test/util/StoreCreator.java | 3 +-
91 files changed, 4027 insertions(+), 1705 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
index bb8a816..439f7b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
@@ -19,6 +19,7 @@
package org.apache.carbondata.core.carbon;
+import java.io.File;
import java.io.Serializable;
/**
@@ -78,6 +79,13 @@ public class CarbonTableIdentifier implements Serializable {
return databaseName + '_' + tableName;
}
+ /**
+ *Creates the key for bad record lgger.
+ */
+ public String getBadRecordLoggerKey() {
+ return databaseName + File.separator + tableName + '_' + tableName;
+ }
+
@Override public int hashCode() {
final int prime = 31;
int result = 1;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
index 951857e..08f7786 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
@@ -36,6 +36,11 @@ public class CarbonWriteDataHolder {
private byte[][] byteValues;
/**
+ * byteValues for no dictionary and non kettle flow.
+ */
+ private byte[][][] byteValuesForNonDictionary;
+
+ /**
* byteValues
*/
private byte[][][] columnByteValues;
@@ -69,6 +74,7 @@ public class CarbonWriteDataHolder {
/**
* Method to initialise double array
+ * TODO Remove after kettle flow got removed.
*
* @param size
*/
@@ -82,6 +88,27 @@ public class CarbonWriteDataHolder {
}
/**
+ * Method to initialise byte array
+ *
+ * @param size
+ */
+ public void initialiseByteArrayValuesWithOutKettle(int size) {
+ if (size < 1) {
+ throw new IllegalArgumentException("Invalid array size");
+ }
+
+ byteValues = new byte[size][];
+ }
+
+ public void initialiseByteArrayValuesForNonDictionary(int size) {
+ if (size < 1) {
+ throw new IllegalArgumentException("Invalid array size");
+ }
+
+ byteValuesForNonDictionary = new byte[size][][];
+ }
+
+ /**
* Method to initialise long array
*
* @param size
@@ -127,6 +154,12 @@ public class CarbonWriteDataHolder {
if (null != value) totalSize += value.length;
}
+ public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) {
+ byteValuesForNonDictionary[index] = value;
+ size++;
+ if (null != value) totalSize += value.length;
+ }
+
/**
* set byte array value by index
*/
@@ -172,6 +205,15 @@ public class CarbonWriteDataHolder {
return byteValues;
}
+ public byte[][][] getNonDictByteArrayValues() {
+ if (size < byteValuesForNonDictionary.length) {
+ byte[][][] temp = new byte[size][][];
+ System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size);
+ byteValuesForNonDictionary = temp;
+ }
+ return byteValuesForNonDictionary;
+ }
+
/**
* Get Writable Double Values
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index a9ceab4..54d2965 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -154,7 +154,12 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
private int getDirectSurrogateForMember(String memberStr) {
Date dateToStr = null;
try {
- dateToStr = simpleDateFormatLocal.get().parse(memberStr);
+ SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
+ if (null == simpleDateFormat) {
+ initialize();
+ simpleDateFormat = simpleDateFormatLocal.get();
+ }
+ dateToStr = simpleDateFormat.parse(memberStr);
} catch (ParseException e) {
LOGGER.debug(
"Cannot convert " + memberStr + " to Time/Long type value. Value considered as null." + e
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java b/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
index c3fd997..69c7cf5 100644
--- a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
@@ -23,10 +23,14 @@ import java.io.Serializable;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
/**
* blocks info
+ * TODO Remove this class after removing of kettle.
*/
-public class BlockDetails implements Serializable {
+public class BlockDetails extends FileSplit implements Serializable {
/**
* serialization version
@@ -41,8 +45,9 @@ public class BlockDetails implements Serializable {
// locations where this block exists
private String[] locations;
- public BlockDetails(String filePath, long blockOffset, long blockLength, String[] locations) {
- this.filePath = filePath;
+ public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+ super(filePath, blockOffset, blockLength, locations);
+ this.filePath = filePath.toString();
this.blockOffset = blockOffset;
this.blockLength = blockLength;
this.locations = locations;
@@ -52,18 +57,10 @@ public class BlockDetails implements Serializable {
return blockOffset;
}
- public void setBlockOffset(long blockOffset) {
- this.blockOffset = blockOffset;
- }
-
public long getBlockLength() {
return blockLength;
}
- public void setBlockLength(long blockLength) {
- this.blockLength = blockLength;
- }
-
public String getFilePath() {
return FileFactory.getUpdatedFilePath(filePath);
}
@@ -75,4 +72,16 @@ public class BlockDetails implements Serializable {
public String[] getLocations() {
return locations;
}
+
+ /** The file containing this split's data. */
+ @Override
+ public Path getPath() { return new Path(filePath); }
+
+ /** The position of the first byte in the file to process. */
+ @Override
+ public long getStart() { return blockOffset; }
+
+ /** The number of bytes in the file to process. */
+ @Override
+ public long getLength() { return blockLength; }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
index 33484d7..3ea96ac 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
@@ -84,6 +84,60 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
}
/**
+ * Sets the comment char to configuration. Default it is #.
+ * @param commentChar
+ * @param configuration
+ */
+ public static void setCommentCharacter(String commentChar, Configuration configuration) {
+ if (commentChar != null && !commentChar.isEmpty()) {
+ configuration.set(COMMENT, commentChar);
+ }
+ }
+
+ /**
+ * Sets the delimiter to configuration. Default it is ','
+ * @param delimiter
+ * @param configuration
+ */
+ public static void setCSVDelimiter(String delimiter, Configuration configuration) {
+ if (delimiter != null && !delimiter.isEmpty()) {
+ configuration.set(DELIMITER, delimiter);
+ }
+ }
+
+ /**
+ * Sets the escape character to configuration. Default it is \
+ * @param escapeCharacter
+ * @param configuration
+ */
+ public static void setEscapeCharacter(String escapeCharacter, Configuration configuration) {
+ if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
+ configuration.set(ESCAPE, escapeCharacter);
+ }
+ }
+
+ /**
+ * Whether header needs to read from csv or not. By default it is false.
+ * @param headerExtractEnable
+ * @param configuration
+ */
+ public static void setHeaderExtractionEnabled(boolean headerExtractEnable,
+ Configuration configuration) {
+ configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
+ }
+
+ /**
+ * Sets the quote character to configuration. Default it is "
+ * @param quoteCharacter
+ * @param configuration
+ */
+ public static void setQuoteCharacter(String quoteCharacter, Configuration configuration) {
+ if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
+ configuration.set(QUOTE, quoteCharacter);
+ }
+ }
+
+ /**
* Treats value as line in file. Key is null.
*/
public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
new file mode 100644
index 0000000..478af0a
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.csv.recorditerator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.io.StringArrayWritable;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * It is wrapper iterator around @{@link RecordReader}.
+ */
+public class RecordReaderIterator extends CarbonIterator<Object []> {
+
+ private RecordReader<NullWritable, StringArrayWritable> recordReader;
+
+ /**
+ * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
+ * multiple times on record reader as it moves another line. To avoid that situation like hasNext
+ * only tells whether next row is present or not and next will move the pointer to next row after
+ * consuming it.
+ */
+ private boolean isConsumed;
+
+ public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader) {
+ this.recordReader = recordReader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (!isConsumed) {
+ isConsumed = recordReader.nextKeyValue();
+ return isConsumed;
+ }
+ return isConsumed;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ @Override
+ public Object[] next() {
+ try {
+ String[] data = recordReader.getCurrentValue().get();
+ isConsumed = false;
+ return data;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 0280820..929e404 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -19,6 +19,8 @@
package org.apache.carbondata.hadoop.test.util;
import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
@@ -350,7 +352,7 @@ public class StoreCreator {
DataProcessTaskStatus dataProcessTaskStatus = new DataProcessTaskStatus(databaseName, tableName);
dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
SchemaInfo info = new SchemaInfo();
- BlockDetails blockDetails = new BlockDetails(loadModel.getFactFilePath(),
+ BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
GraphGenerator.blockInfo.put("qwqwq", new BlockDetails[] { blockDetails });
dataProcessTaskStatus.setBlocksID("qwqwq");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
index 0c3ce9f..5a5d0dd 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
@@ -31,7 +31,8 @@ class LocalSQLContext(val hdfsCarbonBasePath: String)
extends CarbonContext(new SparkContext(new SparkConf()
.setAppName("CarbonSpark")
.setMaster("local[2]")
- .set("spark.sql.shuffle.partitions", "20")),
+ .set("spark.sql.shuffle.partitions", "20")
+ .set("use_kettle_default", "true")),
hdfsCarbonBasePath,
hdfsCarbonBasePath) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 883a169..eff18f1 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -182,6 +182,7 @@
</environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
+ <use.kettle>${use.kettle}</use.kettle>
</systemProperties>
</configuration>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
index 7c3d2a7..1028f78 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -53,7 +54,6 @@ import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
/**
* This is the Merger class responsible for the merging of the segments.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
deleted file mode 100644
index b33601d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
+++ /dev/null
@@ -1,645 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-
-public class CarbonLoadModel implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 6580168429197697465L;
-
- private String databaseName;
-
- private String tableName;
-
- private String factFilePath;
-
- private String dimFolderPath;
-
- private String colDictFilePath;
-
- private String partitionId;
-
- private CarbonDataLoadSchema carbonDataLoadSchema;
-
- private String[] aggTables;
-
- private String aggTableName;
-
- private boolean aggLoadRequest;
-
- private String storePath;
-
- private boolean isRetentionRequest;
-
- private List<String> factFilesToProcess;
- private String csvHeader;
- private String csvDelimiter;
- private String complexDelimiterLevel1;
- private String complexDelimiterLevel2;
-
- private boolean isDirectLoad;
- private List<LoadMetadataDetails> loadMetadataDetails;
-
- private String blocksID;
-
- /**
- * Map from carbon dimension to pre defined dict file path
- */
- private HashMap<CarbonDimension, String> predefDictMap;
-
- /**
- * task id, each spark task has a unique id
- */
- private String taskNo;
- /**
- * new load start time
- */
- private String factTimeStamp;
- /**
- * load Id
- */
- private String segmentId;
-
- private String allDictPath;
-
- /**
- * escape Char
- */
- private String escapeChar;
-
- /**
- * quote Char
- */
- private String quoteChar;
-
- /**
- * comment Char
- */
- private String commentChar;
-
- private String dateFormat;
-
- /**
- * defines the string that should be treated as null while loadind data
- */
- private String serializationNullFormat;
-
- /**
- * defines the string to specify whether the bad record logger should be enabled or not
- */
- private String badRecordsLoggerEnable;
-
- /**
- * defines the option to specify the bad record logger action
- */
- private String badRecordsAction;
-
- /**
- * Max number of columns that needs to be parsed by univocity parser
- */
- private String maxColumns;
-
- /**
- * the key of RDD Iterator in RDD iterator Map
- */
- private String rddIteratorKey;
-
- /**
- * get escape char
- * @return
- */
- public String getEscapeChar() {
- return escapeChar;
- }
-
- /**
- * set escape char
- * @param escapeChar
- */
- public void setEscapeChar(String escapeChar) {
- this.escapeChar = escapeChar;
- }
-
- /**
- * get blocck id
- *
- * @return
- */
- public String getBlocksID() {
- return blocksID;
- }
-
- /**
- * set block id for carbon load model
- *
- * @param blocksID
- */
- public void setBlocksID(String blocksID) {
- this.blocksID = blocksID;
- }
-
- public String getCsvDelimiter() {
- return csvDelimiter;
- }
-
- public void setCsvDelimiter(String csvDelimiter) {
- this.csvDelimiter = csvDelimiter;
- }
-
- public String getComplexDelimiterLevel1() {
- return complexDelimiterLevel1;
- }
-
- public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
- this.complexDelimiterLevel1 = complexDelimiterLevel1;
- }
-
- public String getComplexDelimiterLevel2() {
- return complexDelimiterLevel2;
- }
-
- public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
- this.complexDelimiterLevel2 = complexDelimiterLevel2;
- }
-
- public boolean isDirectLoad() {
- return isDirectLoad;
- }
-
- public void setDirectLoad(boolean isDirectLoad) {
- this.isDirectLoad = isDirectLoad;
- }
-
- public String getAllDictPath() {
- return allDictPath;
- }
-
- public void setAllDictPath(String allDictPath) {
- this.allDictPath = allDictPath;
- }
-
- public List<String> getFactFilesToProcess() {
- return factFilesToProcess;
- }
-
- public void setFactFilesToProcess(List<String> factFilesToProcess) {
- this.factFilesToProcess = factFilesToProcess;
- }
-
- public String getCsvHeader() {
- return csvHeader;
- }
-
- public void setCsvHeader(String csvHeader) {
- this.csvHeader = csvHeader;
- }
-
- public void initPredefDictMap() {
- predefDictMap = new HashMap<>();
- }
-
- public String getPredefDictFilePath(CarbonDimension dimension) {
- return predefDictMap.get(dimension);
- }
-
- public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
- this.predefDictMap.put(dimension, predefDictFilePath);
- }
-
- /**
- * @return carbon dataload schema
- */
- public CarbonDataLoadSchema getCarbonDataLoadSchema() {
- return carbonDataLoadSchema;
- }
-
- /**
- * @param carbonDataLoadSchema
- */
- public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
- this.carbonDataLoadSchema = carbonDataLoadSchema;
- }
-
- /**
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @param databaseName the databaseName to set
- */
- public void setDatabaseName(String databaseName) {
- this.databaseName = databaseName;
- }
-
- /**
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @param tableName the tableName to set
- */
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- /**
- * @return the factFilePath
- */
- public String getFactFilePath() {
- return factFilePath;
- }
-
- /**
- * @param factFilePath the factFilePath to set
- */
- public void setFactFilePath(String factFilePath) {
- this.factFilePath = factFilePath;
- }
-
- /**
- *
- * @return external column dictionary file path
- */
- public String getColDictFilePath() {
- return colDictFilePath;
- }
-
- /**
- * set external column dictionary file path
- * @param colDictFilePath
- */
- public void setColDictFilePath(String colDictFilePath) {
- this.colDictFilePath = colDictFilePath;
- }
-
- /**
- * @return the dimFolderPath
- */
- public String getDimFolderPath() {
- return dimFolderPath;
- }
-
- //TODO SIMIAN
-
- /**
- * @param dimFolderPath the dimFolderPath to set
- */
- public void setDimFolderPath(String dimFolderPath) {
- this.dimFolderPath = dimFolderPath;
- }
-
- /**
- * get copy with parition
- *
- * @param uniqueId
- * @return
- */
- public CarbonLoadModel getCopyWithPartition(String uniqueId) {
- CarbonLoadModel copy = new CarbonLoadModel();
- copy.tableName = tableName;
- copy.dimFolderPath = dimFolderPath;
- copy.factFilePath = factFilePath + '/' + uniqueId;
- copy.databaseName = databaseName;
- copy.partitionId = uniqueId;
- copy.aggTables = aggTables;
- copy.aggTableName = aggTableName;
- copy.aggLoadRequest = aggLoadRequest;
- copy.loadMetadataDetails = loadMetadataDetails;
- copy.isRetentionRequest = isRetentionRequest;
- copy.complexDelimiterLevel1 = complexDelimiterLevel1;
- copy.complexDelimiterLevel2 = complexDelimiterLevel2;
- copy.carbonDataLoadSchema = carbonDataLoadSchema;
- copy.blocksID = blocksID;
- copy.taskNo = taskNo;
- copy.factTimeStamp = factTimeStamp;
- copy.segmentId = segmentId;
- copy.serializationNullFormat = serializationNullFormat;
- copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copy.badRecordsAction = badRecordsAction;
- copy.escapeChar = escapeChar;
- copy.quoteChar = quoteChar;
- copy.commentChar = commentChar;
- copy.maxColumns = maxColumns;
- return copy;
- }
-
- /**
- * get CarbonLoadModel with partition
- *
- * @param uniqueId
- * @param filesForPartition
- * @param header
- * @param delimiter
- * @return
- */
- public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
- String header, String delimiter) {
- CarbonLoadModel copyObj = new CarbonLoadModel();
- copyObj.tableName = tableName;
- copyObj.dimFolderPath = dimFolderPath;
- copyObj.factFilePath = null;
- copyObj.databaseName = databaseName;
- copyObj.partitionId = uniqueId;
- copyObj.aggTables = aggTables;
- copyObj.aggTableName = aggTableName;
- copyObj.aggLoadRequest = aggLoadRequest;
- copyObj.loadMetadataDetails = loadMetadataDetails;
- copyObj.isRetentionRequest = isRetentionRequest;
- copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
- copyObj.csvHeader = header;
- copyObj.factFilesToProcess = filesForPartition;
- copyObj.isDirectLoad = true;
- copyObj.csvDelimiter = delimiter;
- copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
- copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
- copyObj.blocksID = blocksID;
- copyObj.taskNo = taskNo;
- copyObj.factTimeStamp = factTimeStamp;
- copyObj.segmentId = segmentId;
- copyObj.serializationNullFormat = serializationNullFormat;
- copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copyObj.badRecordsAction = badRecordsAction;
- copyObj.escapeChar = escapeChar;
- copyObj.quoteChar = quoteChar;
- copyObj.commentChar = commentChar;
- copyObj.dateFormat = dateFormat;
- copyObj.maxColumns = maxColumns;
- return copyObj;
- }
-
- /**
- * @return the partitionId
- */
- public String getPartitionId() {
- return partitionId;
- }
-
- /**
- * @param partitionId the partitionId to set
- */
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
- /**
- * @return the aggTables
- */
- public String[] getAggTables() {
- return aggTables;
- }
-
- /**
- * @param aggTables the aggTables to set
- */
- public void setAggTables(String[] aggTables) {
- this.aggTables = aggTables;
- }
-
- /**
- * @return the aggLoadRequest
- */
- public boolean isAggLoadRequest() {
- return aggLoadRequest;
- }
-
- /**
- * @param aggLoadRequest the aggLoadRequest to set
- */
- public void setAggLoadRequest(boolean aggLoadRequest) {
- this.aggLoadRequest = aggLoadRequest;
- }
-
- /**
- * @param storePath The storePath to set.
- */
- public void setStorePath(String storePath) {
- this.storePath = storePath;
- }
-
- /**
- * @return Returns the aggTableName.
- */
- public String getAggTableName() {
- return aggTableName;
- }
-
- /**
- * @return Returns the factStoreLocation.
- */
- public String getStorePath() {
- return storePath;
- }
-
- /**
- * @param aggTableName The aggTableName to set.
- */
- public void setAggTableName(String aggTableName) {
- this.aggTableName = aggTableName;
- }
-
- /**
- * isRetentionRequest
- *
- * @return
- */
- public boolean isRetentionRequest() {
- return isRetentionRequest;
- }
-
- /**
- * @param isRetentionRequest
- */
- public void setRetentionRequest(boolean isRetentionRequest) {
- this.isRetentionRequest = isRetentionRequest;
- }
-
- /**
- * getLoadMetadataDetails.
- *
- * @return
- */
- public List<LoadMetadataDetails> getLoadMetadataDetails() {
- return loadMetadataDetails;
- }
-
- /**
- * setLoadMetadataDetails.
- *
- * @param loadMetadataDetails
- */
- public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
- this.loadMetadataDetails = loadMetadataDetails;
- }
-
- /**
- * @return
- */
- public String getTaskNo() {
- return taskNo;
- }
-
- /**
- * @param taskNo
- */
- public void setTaskNo(String taskNo) {
- this.taskNo = taskNo;
- }
-
- /**
- * @return
- */
- public String getFactTimeStamp() {
- return factTimeStamp;
- }
-
- /**
- * @param factTimeStamp
- */
- public void setFactTimeStamp(String factTimeStamp) {
- this.factTimeStamp = factTimeStamp;
- }
-
- public String[] getDelimiters() {
- return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
- }
-
- /**
- * @return load Id
- */
- public String getSegmentId() {
- return segmentId;
- }
-
- /**
- * @param segmentId
- */
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- /**
- * the method returns the value to be treated as null while data load
- * @return
- */
- public String getSerializationNullFormat() {
- return serializationNullFormat;
- }
-
- /**
- * the method sets the value to be treated as null while data load
- * @param serializationNullFormat
- */
- public void setSerializationNullFormat(String serializationNullFormat) {
- this.serializationNullFormat = serializationNullFormat;
- }
-
- /**
- * returns the string to enable bad record logger
- * @return
- */
- public String getBadRecordsLoggerEnable() {
- return badRecordsLoggerEnable;
- }
-
- /**
- * method sets the string to specify whether to enable or dissable the badrecord logger.
- * @param badRecordsLoggerEnable
- */
- public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
- this.badRecordsLoggerEnable = badRecordsLoggerEnable;
- }
-
- public String getQuoteChar() {
- return quoteChar;
- }
-
- public void setQuoteChar(String quoteChar) {
- this.quoteChar = quoteChar;
- }
-
- public String getCommentChar() {
- return commentChar;
- }
-
- public void setCommentChar(String commentChar) {
- this.commentChar = commentChar;
- }
-
- public String getDateFormat() { return dateFormat; }
-
- public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
-
- /**
- * @return
- */
- public String getMaxColumns() {
- return maxColumns;
- }
-
- /**
- * @param maxColumns
- */
- public void setMaxColumns(String maxColumns) {
- this.maxColumns = maxColumns;
- }
-
- /**
- * returns option to specify the bad record logger action
- * @return
- */
- public String getBadRecordsAction() {
- return badRecordsAction;
- }
-
- /**
- * set option to specify the bad record logger action
- * @param badRecordsAction
- */
- public void setBadRecordsAction(String badRecordsAction) {
- this.badRecordsAction = badRecordsAction;
- }
-
- public String getRddIteratorKey() {
- return rddIteratorKey;
- }
-
- public void setRddIteratorKey(String rddIteratorKey) {
- this.rddIteratorKey = rddIteratorKey;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index fa796ab..3d670a2 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -79,6 +79,7 @@ import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.carbondata.spark.merger.NodeBlockRelation;
import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index b9ac4dd..d1ff644 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.load.LoadMetadataDetails;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
public final class DeleteLoadFolders {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 32a91de..6c9ca41 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.integration.spark.merger.CompactionType;
import org.apache.carbondata.lcm.locks.ICarbonLock;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.spark.load.CarbonLoaderUtil;
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index a897e80..6b56545 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.load.LoadMetadataDetails;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
public final class LoadMetadataUtil {
private LoadMetadataUtil() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 1939095..65ff787 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -123,8 +123,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
Map(("fileheader" -> header)),
false,
null,
- Some(dataFrame),
- options.useKettle).run(cc)
+ Some(dataFrame)).run(cc)
}
private def csvPackage: String = "com.databricks.spark.csv.newapi"
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 4baeb67..856e67c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.constants.DataProcessorConstants
import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
@@ -165,7 +166,6 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* @param sc The SparkContext to associate the RDD with.
* @param result Output result
* @param carbonLoadModel Carbon load model which contain the load info
- * @param storeLocation Tmp store location
* @param storePath The store location
* @param kettleHomePath The kettle home path
* @param partitioner Partitioner which specify how to partition
@@ -182,7 +182,6 @@ class DataFileLoaderRDD[K, V](
sc: SparkContext,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- var storeLocation: String,
storePath: String,
kettleHomePath: String,
partitioner: Partitioner,
@@ -482,7 +481,6 @@ class DataFileLoaderRDD[K, V](
* @param sc
* @param result
* @param carbonLoadModel
- * @param storeLocation
* @param storePath
* @param kettleHomePath
* @param columinar
@@ -497,7 +495,6 @@ class DataFrameLoaderRDD[K, V](
sc: SparkContext,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- var storeLocation: String,
storePath: String,
kettleHomePath: String,
columinar: Boolean,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f97bd43..4392efe 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -27,12 +27,13 @@ import scala.util.Random
import scala.util.control.Breaks._
import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{util => _, _}
import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
-import org.apache.spark.sql.hive.{DistributionUtil}
+import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -47,6 +48,8 @@ import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, Com
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
@@ -660,16 +663,15 @@ object CarbonDataRDDFactory extends Logging {
def loadCarbonData(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storeLocation: String,
storePath: String,
kettleHomePath: String,
partitioner: Partitioner,
columinar: Boolean,
- isAgg: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
+ useKettle: Boolean,
dataFrame: Option[DataFrame] = None): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
+ val isAgg = false
// for handling of the segment Merging.
def handleSegmentMerging(tableCreationTime: Long): Unit = {
logger
@@ -761,6 +763,11 @@ object CarbonDataRDDFactory extends Logging {
.audit("Data load request has been received for table " + carbonLoadModel
.getDatabaseName + "." + carbonLoadModel.getTableName
)
+ if (!useKettle) {
+ logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
+ .getDatabaseName + "." + carbonLoadModel.getTableName
+ )
+ }
// Check if any load need to be deleted before loading new data
deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
isForceDeletion = false)
@@ -938,7 +945,7 @@ object CarbonDataRDDFactory extends Logging {
val blockDetailsList =
entry._2.asScala.map(distributable => {
val tableBlock = distributable.asInstanceOf[TableBlockInfo]
- new BlockDetails(tableBlock.getFilePath,
+ new BlockDetails(new Path(tableBlock.getFilePath),
tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
)
}).toArray
@@ -947,20 +954,29 @@ object CarbonDataRDDFactory extends Logging {
).toArray
}
- status = new DataFileLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storeLocation,
- storePath,
- kettleHomePath,
- partitioner,
- columinar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- blocksGroupBy,
- isTableSplitPartition
- ).collect()
+ if (useKettle) {
+ status = new DataFileLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storePath,
+ kettleHomePath,
+ partitioner,
+ columinar,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ blocksGroupBy,
+ isTableSplitPartition
+ ).collect()
+ } else {
+ status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ partitioner,
+ currentLoadCount,
+ blocksGroupBy,
+ isTableSplitPartition).collect()
+ }
}
def loadDataFrame(): Unit = {
@@ -972,7 +988,6 @@ object CarbonDataRDDFactory extends Logging {
status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
- storeLocation,
storePath,
kettleHomePath,
columinar,
@@ -1028,7 +1043,8 @@ object CarbonDataRDDFactory extends Logging {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
ex match {
case sparkException: SparkException =>
- if (sparkException.getCause.isInstanceOf[DataLoadingException]) {
+ if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
+ sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
executorMessage = sparkException.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 8b4b74a..b7da579 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2bfd99d..b8f4087 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -39,10 +39,11 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.scan.result.iterator.RawResultIterator
import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.QueryPlanUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
new file mode 100644
index 0000000..6affe66
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner,
+ loadCount: Integer,
+ blocksGroupBy: Array[(String, Array[BlockDetails])],
+ isTableSplitPartition: Boolean)
+ extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging {
+
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+ private val confBroadcast =
+ sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+ override def getPartitions: Array[Partition] = {
+ if (isTableSplitPartition) {
+ // for table split partition
+ var splits: Array[TableSplit] = null
+
+ if (carbonLoadModel.isDirectLoad) {
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+ partitioner.nodeList, partitioner.partitionCount)
+ } else {
+ splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName, null, partitioner)
+ }
+
+ splits.zipWithIndex.map { s =>
+ // filter the same partition unique id, because only one will match, so get 0 element
+ val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+ p._1 == s._1.getPartition.getUniqueID)(0)._2
+ new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+ }
+ } else {
+ // for node partition
+ blocksGroupBy.zipWithIndex.map { b =>
+ new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+ }
+ }
+ }
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD should not be checkpointed.
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var model: CarbonLoadModel = _
+ var uniqueLoadStatusId =
+ carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+ try {
+ loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ val recordReaders = getInputIterators
+ val loader = new SparkPartitionLoader(model,
+ theSplit.index,
+ null,
+ null,
+ loadCount,
+ loadMetadataDetails)
+ // Intialize to set carbon properties
+ loader.initialize()
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ new DataLoadExecutor().execute(model,
+ loader.storeLocation,
+ recordReaders)
+ } catch {
+ case e: BadRecordFoundException =>
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ logInfo("Bad Record Found")
+ case e: Exception =>
+ logInfo("DataLoad failure", e)
+ LOGGER.error(e)
+ throw e
+ }
+
+ def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+ val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0)
+ val configuration: Configuration = confBroadcast.value.value
+ configureCSVInputFormat(configuration)
+ val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
+ val format = new CSVInputFormat
+ if (isTableSplitPartition) {
+ // for table split partition
+ val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit.value)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID)
+ }
+ partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+ StandardLogService.setThreadName(partitionID, null)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+ partitionID, split.partitionBlocksDetail.length)
+ val readers =
+ split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+ readers.zipWithIndex.foreach { case (reader, index) =>
+ reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+ }
+ readers.map(new RecordReaderIterator(_))
+ } else {
+ // for node partition
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ val blocksID = gernerateBlocksID
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ if (carbonLoadModel.isDirectLoad) {
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ } else {
+ model = carbonLoadModel.getCopyWithPartition(partitionID)
+ }
+ StandardLogService.setThreadName(blocksID, null)
+ val readers =
+ split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+ readers.zipWithIndex.foreach { case (reader, index) =>
+ reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+ }
+ readers.map(new RecordReaderIterator(_))
+ }
+ }
+
+ def configureCSVInputFormat(configuration: Configuration): Unit = {
+ CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
+ CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
+ CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
+ CSVInputFormat.setHeaderExtractionEnabled(
+ carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
+ configuration)
+ CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+ }
+
+ /**
+ * generate blocks id
+ *
+ * @return
+ */
+ def gernerateBlocksID: String = {
+ if (isTableSplitPartition) {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+ .getPartition.getUniqueID + "_" + UUID.randomUUID()
+ } else {
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
+ }
+ }
+
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ iter
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ isTableSplitPartition match {
+ case true =>
+ // for table split partition
+ val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+ val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+ location
+ case false =>
+ // for node partition
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split : " + firstOptionLocation.head)
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach { tableBlock =>
+ tableBlock.getLocations.foreach { location =>
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
+ }
+ }
+ }
+ }
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index d14eedf..bd295bc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -46,9 +46,9 @@ import org.apache.carbondata.core.reader.CarbonDictionaryReader
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
import org.apache.carbondata.spark.rdd._
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
new file mode 100644
index 0000000..84f398a
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mapred
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * It is just dummy class to use Sparks package restricted SparkHadoopMapReduceUtil.
+ */
+trait CarbonHadoopMapReduceUtil extends SparkHadoopMapReduceUtil {
+
+}
+
+class CarbonSerializableConfiguration(@transient var conf: Configuration)
+ extends SerializableConfiguration(conf)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 02912fd..6f149f7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -969,9 +969,8 @@ class CarbonSqlParser()
validateOptions(optionsList)
}
val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
- val useKettle = optionsMap.getOrElse("USE_KETTLE", "true").toBoolean
LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
- isOverwrite.isDefined, useKettle = useKettle)
+ isOverwrite.isDefined)
}
private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f285984..ed757e3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -56,6 +56,7 @@ import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load._
@@ -1026,33 +1027,6 @@ case class LoadTable(
options: scala.collection.immutable.Map[String, String],
isOverwriteExist: Boolean = false,
var inputSqlString: String = null,
- dataFrame: Option[DataFrame] = None,
- useKettle: Boolean = true) extends RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- if (useKettle) {
- LoadTableUsingKettle(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
- options, isOverwriteExist, inputSqlString, dataFrame).run(sqlContext)
- } else {
- LoadTableUsingProcessorStep().run(sqlContext)
- }
- }
-}
-
-case class LoadTableUsingProcessorStep() extends RunnableCommand {
- def run(sqlContext: SQLContext): Seq[Row] = {
- throw new UnsupportedOperationException("work in progress")
- }
-}
-
-case class LoadTableUsingKettle(
- databaseNameOp: Option[String],
- tableName: String,
- factPathFromUser: String,
- dimFilesPath: Seq[DataLoadTableFileMapping],
- options: scala.collection.immutable.Map[String, String],
- isOverwriteExist: Boolean = false,
- var inputSqlString: String = null,
dataFrame: Option[DataFrame] = None) extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -1112,24 +1086,28 @@ case class LoadTableUsingKettle(
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- var storeLocation = ""
val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != configuredStore && configuredStore.nonEmpty) {
- storeLocation = configuredStore(Random.nextInt(configuredStore.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
var partitionLocation = relation.tableMeta.storePath + "/partition/" +
relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
relation.tableMeta.carbonTableIdentifier.getTableName + "/"
- storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
+ // TODO It will be removed after kettle is removed.
+ val useKettle = options.get("use_kettle") match {
+ case Some(value) => value.toBoolean
+ case _ =>
+ val useKettleLocal = System.getProperty("use.kettle")
+ if (useKettleLocal == null) {
+ sqlContext.sparkContext.getConf.get("use_kettle_default", "true").toBoolean
+ } else {
+ useKettleLocal.toBoolean
+ }
+ }
+
val delimiter = options.getOrElse("delimiter", ",")
val quoteChar = options.getOrElse("quotechar", "\"")
val fileHeader = options.getOrElse("fileheader", "")
@@ -1198,10 +1176,15 @@ case class LoadTableUsingKettle(
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
dataFrame)
- CarbonDataRDDFactory
- .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.tableMeta.storePath,
+ CarbonDataRDDFactory.loadCarbonData(sqlContext,
+ carbonLoadModel,
+ relation.tableMeta.storePath,
kettleHomePath,
- relation.tableMeta.partitioner, columinar, isAgg = false, partitionStatus, dataFrame)
+ relation.tableMeta.partitioner,
+ columinar,
+ partitionStatus,
+ useKettle,
+ dataFrame)
}
catch {
case ex: Exception =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 03ddeff..a755d5d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -228,12 +228,12 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
case ShowLoadsCommand(databaseName, table, limit) =>
ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
- options, isOverwriteExist, inputSqlString, dataFrame, useKettle) =>
+ options, isOverwriteExist, inputSqlString, dataFrame) =>
val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
.tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
if (isCarbonTable || options.nonEmpty) {
ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
- options, isOverwriteExist, inputSqlString, dataFrame, useKettle)) :: Nil
+ options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
} else {
ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
index 22713da..03e15c1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
@@ -56,7 +56,7 @@ object SplitUtils {
part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
}
splits.map { block =>
- new BlockDetails(block.getPath.toString,
+ new BlockDetails(block.getPath,
block.getStart,
block.getLength,
block.getLocations
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index 0c039bd..fa8731a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -24,7 +24,7 @@ import java.io.File
import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
import org.apache.spark.util.FileUtils
-import org.apache.carbondata.spark.load.CarbonLoadModel
+import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 2b19829..c215a46 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -23,11 +23,13 @@ import java.io.File
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
+
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.load.CarbonLoadModel
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
/**
* Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
index e7a549c..3b06311 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
@@ -25,13 +25,14 @@ import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.carbon.path.CarbonStorePath
import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
+import org.apache.carbondata.processing.model.CarbonLoadModel
/**
* Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 97bcb64..07d1a16 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -24,13 +24,14 @@ import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.CarbonLoadModel
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.apache.spark.sql.common.util.CarbonHiveContext
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
/**
* test case for external column dictionary generation
* also support complicated type
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 179d0f6..9cbdb7e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.spark.load.CarbonLoadModel
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
@@ -39,11 +38,11 @@ import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import org.apache.carbondata.common.ext.PathFactory
-
import org.apache.carbondata.core.carbon.path.CarbonTablePath
import org.apache.carbondata.core.carbon.ColumnIdentifier
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.model.CarbonLoadModel
class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {