You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:25 UTC
[3/5] incubator-carbondata git commit: Data load integration of all
steps for removing kettle
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index e841bc8..74633e2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -24,11 +24,13 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.apache.spark.sql.common.util.CarbonHiveContext
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
+
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.load.CarbonLoadModel
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
/**
* Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
index e9c9471..1990070 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
@@ -31,7 +31,8 @@ class LocalSQLContext(val hdfsCarbonBasePath: String)
extends CarbonContext(new SparkContext(new SparkConf()
.setAppName("CarbonSpark")
.setMaster("local[2]")
- .set("spark.sql.shuffle.partitions", "20")),
+ .set("spark.sql.shuffle.partitions", "20")
+ .set("use_kettle_default", "true")),
hdfsCarbonBasePath,
hdfsCarbonBasePath) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a06133b..4bdd587 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
<hadoop.version>2.2.0</hadoop.version>
<scala.version>2.10.4</scala.version>
<kettle.version>4.4.0-stable</kettle.version>
+ <use.kettle>true</use.kettle>
<hadoop.deps.scope>compile</hadoop.deps.scope>
<spark.deps.scope>compile</spark.deps.scope>
<scala.deps.scope>compile</scala.deps.scope>
@@ -336,6 +337,12 @@
<id>include-all</id>
</profile>
<profile>
+ <id>no-kettle</id>
+ <properties>
+ <use.kettle>false</use.kettle>
+ </properties>
+ </profile>
+ <profile>
<id>rat</id>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
index cc5c3a2..09e000f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
@@ -20,7 +20,6 @@
package org.apache.carbondata.processing.csvload;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,7 +38,8 @@ import org.apache.carbondata.processing.constants.DataProcessorConstants;
import org.apache.carbondata.processing.csvreaderstep.CsvInputMeta;
import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
import org.apache.carbondata.processing.etl.DataLoadingException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
@@ -97,7 +97,7 @@ public class DataGraphExecuter {
private String[] getColumnNames(SchemaInfo schemaInfo, String tableName, String partitionId,
CarbonDataLoadSchema schema) {
- Set<String> columnNames = GraphExecutionUtil.getSchemaColumnNames(schema, tableName);
+ Set<String> columnNames = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName);
return columnNames.toArray(new String[columnNames.size()]);
}
@@ -210,7 +210,7 @@ public class DataGraphExecuter {
if (trans.getErrors() > 0) {
LOGGER.error("Graph Execution had errors");
throw new DataLoadingException("Due to internal errors, please check logs for more details.");
- } else if (null != BadRecordslogger.hasBadRecord(key)) {
+ } else if (null != BadRecordsLogger.hasBadRecord(key)) {
LOGGER.error("Graph Execution is partcially success");
throw new DataLoadingException(DataProcessorConstants.BAD_REC_FOUND,
"Graph Execution is partcially success");
@@ -417,34 +417,6 @@ public class DataGraphExecuter {
trans.setLogLevel(LogLevel.NOTHING);
}
- private void validateHeader(SchemaInfo schemaInfo, String partitionId,
- CarbonDataLoadSchema schema) throws DataLoadingException {
- String[] columnNames = getColumnNames(schemaInfo, model.getTableName(), partitionId, schema);
- String[] csvHeader = model.getCsvHeader().toLowerCase().split(",");
-
- List<String> csvColumnsList = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- for (String column : csvHeader) {
- csvColumnsList.add(column.replaceAll("\"", "").trim());
- }
-
- int count = 0;
-
- for (String columns : columnNames) {
- if (csvColumnsList.contains(columns.toLowerCase())) {
- count++;
- }
- }
-
- if (count != columnNames.length) {
- LOGGER.error("CSV header provided in DDL is not proper." +
- " Column names in schema and CSV header are not the same.");
- throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
- "CSV header provided in DDL is not proper. Column names in schema and CSV header are " +
- "not the same.");
- }
- }
-
/**
* This method will validate the both fact as well as dimension csv files.
*
@@ -503,7 +475,14 @@ public class DataGraphExecuter {
}
} else if (model.isDirectLoad()) {
if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
- validateHeader(schemaInfo, partitionId, schema);
+ if (!CarbonDataProcessorUtil
+ .isHeaderValid(model.getTableName(), model.getCsvHeader(), schema, ",")) {
+ LOGGER.error("CSV header provided in DDL is not proper."
+ + " Column names in schema and CSV header are not the same.");
+ throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
+ "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
+ + "not the same.");
+ }
} else {
for (String file : model.getFilesToProcess()) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
index 6d82bcd..1dd9bdf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema.DimensionRelation;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
@@ -194,65 +192,6 @@ public final class GraphExecutionUtil {
}
/**
- * This method update the column Name
- *
- * @param table
- * @param tableName
- * @param schema
- */
- public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
- Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- String factTableName = schema.getCarbonTable().getFactTableName();
- if (tableName.equals(factTableName)) {
-
- List<CarbonDimension> dimensions =
- schema.getCarbonTable().getDimensionByTableName(factTableName);
-
- for (CarbonDimension dimension : dimensions) {
-
- String foreignKey = null;
- for (DimensionRelation dimRel : schema.getDimensionRelationList()) {
- for (String field : dimRel.getColumns()) {
- if (dimension.getColName().equals(field)) {
- foreignKey = dimRel.getRelation().getFactForeignKeyColumn();
- break;
- }
- }
- if (null != foreignKey) {
- break;
- }
- }
- if (null == foreignKey) {
- columnNames.add(dimension.getColName());
- } else {
- columnNames.add(foreignKey);
- }
- }
-
- List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName);
- for (CarbonMeasure msr : measures) {
- if (!msr.getColumnSchema().isInvisible()) {
- columnNames.add(msr.getColName());
- }
- }
- } else {
- List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(tableName);
- for (CarbonDimension dimension : dimensions) {
- columnNames.add(dimension.getColName());
- }
-
- List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(tableName);
- for (CarbonMeasure msr : measures) {
- columnNames.add(msr.getColName());
- }
- }
-
- return columnNames;
-
- }
-
- /**
* @param csvFilePath
* @param columnNames
* @return
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 2faef7b..9843c2e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
import org.pentaho.di.core.exception.KettleException;
@@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException;
/**
* Array DataType stateless object used in data loading
*/
-public class ArrayDataType implements GenericDataType {
+public class ArrayDataType implements GenericDataType<ArrayObject> {
/**
* child columns
@@ -177,7 +179,28 @@ public class ArrayDataType implements GenericDataType {
}
}
- /*
+ @Override
+ public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException {
+ if (input == null) {
+ dataOutputStream.writeInt(1);
+ children.writeByteArray(null, dataOutputStream);
+ } else {
+ Object[] data = input.getData();
+ dataOutputStream.writeInt(data.length);
+ for (Object eachInput : data) {
+ children.writeByteArray(eachInput, dataOutputStream);
+ }
+ }
+ }
+
+ @Override
+ public void fillCardinality(List<Integer> dimCardWithComplex) {
+ dimCardWithComplex.add(0);
+ children.fillCardinality(dimCardWithComplex);
+ }
+
+ /**
* parse byte array and bit pack
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 0fdca6e..f8c765b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
@@ -35,7 +36,7 @@ import org.pentaho.di.core.exception.KettleException;
* Generic DataType interface which will be used while data loading for complex types like Array &
* Struct
*/
-public interface GenericDataType {
+public interface GenericDataType<T> {
/**
* @return name of the column
@@ -78,6 +79,14 @@ public interface GenericDataType {
CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) throws KettleException, IOException;
/**
+ * writes to byte stream
+ * @param dataOutputStream
+ * @throws IOException
+ */
+ void writeByteArray(T input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException;
+
+ /**
* @return surrogateIndex for primitive column in complex type
*/
int getSurrogateIndex();
@@ -151,4 +160,11 @@ public interface GenericDataType {
* @param maxSurrogateKeyArray
*/
void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+ /**
+ * Fill the cardinality of the primitive datatypes
+ * @param dimCardWithComplex
+ */
+ void fillCardinality(List<Integer> dimCardWithComplex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 9199c51..610366c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -25,10 +25,22 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtilException;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
+import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
import org.pentaho.di.core.exception.KettleException;
@@ -36,7 +48,7 @@ import org.pentaho.di.core.exception.KettleException;
/**
* Primitive DataType stateless object used in data loading
*/
-public class PrimitiveDataType implements GenericDataType {
+public class PrimitiveDataType implements GenericDataType<Object> {
/**
* surrogate index
@@ -78,6 +90,10 @@ public class PrimitiveDataType implements GenericDataType {
*/
private int dataCounter;
+ private BiDictionary<Integer, Object> dictionaryGenerator;
+
+ private CarbonDimension carbonDimension;
+
/**
* constructor
*
@@ -92,6 +108,36 @@ public class PrimitiveDataType implements GenericDataType {
this.dimensionOrdinal = dimensionOrdinal;
}
+ /**
+ * constructor
+ *
+ * @param name
+ * @param parentname
+ * @param columnId
+ */
+ public PrimitiveDataType(String name, String parentname, String columnId,
+ CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier) {
+ this.name = name;
+ this.parentname = parentname;
+ this.columnId = columnId;
+ this.carbonDimension = carbonDimension;
+ DictionaryColumnUniqueIdentifier identifier =
+ new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+ carbonDimension.getColumnIdentifier(), carbonDimension.getDataType());
+ try {
+ if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(carbonDimension.getDataType()));
+ } else {
+ Dictionary dictionary = cache.get(identifier);
+ dictionaryGenerator = new PreCreatedDictionary(dictionary);
+ }
+ } catch (CarbonUtilException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/*
* primitive column will not have any child column
*/
@@ -178,9 +224,30 @@ public class PrimitiveDataType implements GenericDataType {
dataOutputStream.writeInt(surrogateKey);
}
+ @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException {
+ String parsedValue =
+ input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
+ Integer surrogateKey;
+ if (null == parsedValue) {
+ surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ } else {
+ surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+ surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ }
+ }
+ dataOutputStream.writeInt(surrogateKey);
+ }
+
+ @Override
+ public void fillCardinality(List<Integer> dimCardWithComplex) {
+ dimCardWithComplex.add(dictionaryGenerator.size());
+ }
+
/*
- * parse bytearray and bit pack
- */
+ * parse bytearray and bit pack
+ */
@Override
public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
KeyGenerator[] generator) throws IOException, KeyGenException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index bcf18c8..f034895 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
import org.pentaho.di.core.exception.KettleException;
@@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException;
/**
* Struct DataType stateless object used in data loading
*/
-public class StructDataType implements GenericDataType {
+public class StructDataType implements GenericDataType<StructObject> {
/**
* children columns
@@ -186,9 +188,38 @@ public class StructDataType implements GenericDataType {
}
}
+ @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
+ throws IOException, DictionaryGenerationException {
+ dataOutputStream.writeInt(children.size());
+ if (input == null) {
+ dataOutputStream.writeInt(children.size());
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).writeByteArray(null, dataOutputStream);
+ }
+ } else {
+ Object[] data = input.getData();
+ for (int i = 0; i < data.length && i < children.size(); i++) {
+ children.get(i).writeByteArray(data[i], dataOutputStream);
+ }
+
+ // For other children elements which dont have data, write empty
+ for (int i = data.length; i < children.size(); i++) {
+ children.get(i).writeByteArray(null, dataOutputStream);
+ }
+ }
+ }
+
+ @Override
+ public void fillCardinality(List<Integer> dimCardWithComplex) {
+ dimCardWithComplex.add(0);
+ for (int i = 0; i < children.size(); i++) {
+ children.get(i).fillCardinality(dimCardWithComplex);
+ }
+ }
+
/*
- * parse bytearray and bit pack
- */
+ * parse bytearray and bit pack
+ */
@Override
public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
KeyGenerator[] generator) throws IOException, KeyGenException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 57d74c3..a420d27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -363,7 +363,7 @@ public class MDKeyGenStep extends BaseStep {
String carbonDataDirectoryPath = getCarbonDataFolderLocation();
finalMerger = new SingleThreadFinalSortFilesMerger(dataFolderLocation, tableName,
dimensionCount - meta.getComplexDimsCount(), meta.getComplexDimsCount(), measureCount,
- meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension);
+ meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension, true);
CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel();
carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
new file mode 100644
index 0000000..10f5197
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.carbondata.processing.model;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+
+public class CarbonLoadModel implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6580168429197697465L;
+
+ private String databaseName;
+
+ private String tableName;
+
+ private String factFilePath;
+
+ private String dimFolderPath;
+
+ private String colDictFilePath;
+
+ private String partitionId;
+
+ private CarbonDataLoadSchema carbonDataLoadSchema;
+
+ private String[] aggTables;
+
+ private String aggTableName;
+
+ private boolean aggLoadRequest;
+
+ private String storePath;
+
+ private boolean isRetentionRequest;
+
+ private List<String> factFilesToProcess;
+ private String csvHeader;
+ private String csvDelimiter;
+ private String complexDelimiterLevel1;
+ private String complexDelimiterLevel2;
+
+ private boolean isDirectLoad;
+ private List<LoadMetadataDetails> loadMetadataDetails;
+
+ private String blocksID;
+
+ /**
+ * Map from carbon dimension to pre defined dict file path
+ */
+ private HashMap<CarbonDimension, String> predefDictMap;
+
+ /**
+ * task id, each spark task has a unique id
+ */
+ private String taskNo;
+ /**
+ * new load start time
+ */
+ private String factTimeStamp;
+ /**
+ * load Id
+ */
+ private String segmentId;
+
+ private String allDictPath;
+
+ /**
+ * escape Char
+ */
+ private String escapeChar;
+
+ /**
+ * quote Char
+ */
+ private String quoteChar;
+
+ /**
+ * comment Char
+ */
+ private String commentChar;
+
+ private String dateFormat;
+
+ /**
+ * defines the string that should be treated as null while loadind data
+ */
+ private String serializationNullFormat;
+
+ /**
+ * defines the string to specify whether the bad record logger should be enabled or not
+ */
+ private String badRecordsLoggerEnable;
+
+ /**
+ * defines the option to specify the bad record logger action
+ */
+ private String badRecordsAction;
+
+ /**
+ * Max number of columns that needs to be parsed by univocity parser
+ */
+ private String maxColumns;
+
+ /**
+ * the key of RDD Iterator in RDD iterator Map
+ */
+ private String rddIteratorKey;
+
+ /**
+ * get escape char
+ * @return
+ */
+ public String getEscapeChar() {
+ return escapeChar;
+ }
+
+ /**
+ * set escape char
+ * @param escapeChar
+ */
+ public void setEscapeChar(String escapeChar) {
+ this.escapeChar = escapeChar;
+ }
+
+ /**
+ * get blocck id
+ *
+ * @return
+ */
+ public String getBlocksID() {
+ return blocksID;
+ }
+
+ /**
+ * set block id for carbon load model
+ *
+ * @param blocksID
+ */
+ public void setBlocksID(String blocksID) {
+ this.blocksID = blocksID;
+ }
+
+ public String getCsvDelimiter() {
+ return csvDelimiter;
+ }
+
+ public void setCsvDelimiter(String csvDelimiter) {
+ this.csvDelimiter = csvDelimiter;
+ }
+
+ public String getComplexDelimiterLevel1() {
+ return complexDelimiterLevel1;
+ }
+
+ public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
+ this.complexDelimiterLevel1 = complexDelimiterLevel1;
+ }
+
+ public String getComplexDelimiterLevel2() {
+ return complexDelimiterLevel2;
+ }
+
+ public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
+ this.complexDelimiterLevel2 = complexDelimiterLevel2;
+ }
+
+ public boolean isDirectLoad() {
+ return isDirectLoad;
+ }
+
+ public void setDirectLoad(boolean isDirectLoad) {
+ this.isDirectLoad = isDirectLoad;
+ }
+
+ public String getAllDictPath() {
+ return allDictPath;
+ }
+
+ public void setAllDictPath(String allDictPath) {
+ this.allDictPath = allDictPath;
+ }
+
+ public List<String> getFactFilesToProcess() {
+ return factFilesToProcess;
+ }
+
+ public void setFactFilesToProcess(List<String> factFilesToProcess) {
+ this.factFilesToProcess = factFilesToProcess;
+ }
+
+ public String getCsvHeader() {
+ return csvHeader;
+ }
+
+ public void setCsvHeader(String csvHeader) {
+ this.csvHeader = csvHeader;
+ }
+
+ public void initPredefDictMap() {
+ predefDictMap = new HashMap<>();
+ }
+
+ public String getPredefDictFilePath(CarbonDimension dimension) {
+ return predefDictMap.get(dimension);
+ }
+
+ public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
+ this.predefDictMap.put(dimension, predefDictFilePath);
+ }
+
+ /**
+ * @return carbon dataload schema
+ */
+ public CarbonDataLoadSchema getCarbonDataLoadSchema() {
+ return carbonDataLoadSchema;
+ }
+
+ /**
+ * @param carbonDataLoadSchema
+ */
+ public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
+ this.carbonDataLoadSchema = carbonDataLoadSchema;
+ }
+
+ /**
+ * @return the databaseName
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * @param databaseName the databaseName to set
+ */
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ /**
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @param tableName the tableName to set
+ */
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * @return the factFilePath
+ */
+ public String getFactFilePath() {
+ return factFilePath;
+ }
+
+ /**
+ * @param factFilePath the factFilePath to set
+ */
+ public void setFactFilePath(String factFilePath) {
+ this.factFilePath = factFilePath;
+ }
+
+ /**
+ *
+ * @return external column dictionary file path
+ */
+ public String getColDictFilePath() {
+ return colDictFilePath;
+ }
+
+ /**
+ * set external column dictionary file path
+ * @param colDictFilePath
+ */
+ public void setColDictFilePath(String colDictFilePath) {
+ this.colDictFilePath = colDictFilePath;
+ }
+
+ /**
+ * @return the dimFolderPath
+ */
+ public String getDimFolderPath() {
+ return dimFolderPath;
+ }
+
+ //TODO SIMIAN
+
+ /**
+ * @param dimFolderPath the dimFolderPath to set
+ */
+ public void setDimFolderPath(String dimFolderPath) {
+ this.dimFolderPath = dimFolderPath;
+ }
+
+ /**
+ * get copy with parition
+ *
+ * @param uniqueId
+ * @return
+ */
+ public CarbonLoadModel getCopyWithPartition(String uniqueId) {
+ CarbonLoadModel copy = new CarbonLoadModel();
+ copy.tableName = tableName;
+ copy.dimFolderPath = dimFolderPath;
+ copy.factFilePath = factFilePath + '/' + uniqueId;
+ copy.databaseName = databaseName;
+ copy.partitionId = uniqueId;
+ copy.aggTables = aggTables;
+ copy.aggTableName = aggTableName;
+ copy.aggLoadRequest = aggLoadRequest;
+ copy.loadMetadataDetails = loadMetadataDetails;
+ copy.isRetentionRequest = isRetentionRequest;
+ copy.complexDelimiterLevel1 = complexDelimiterLevel1;
+ copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+ copy.carbonDataLoadSchema = carbonDataLoadSchema;
+ copy.blocksID = blocksID;
+ copy.taskNo = taskNo;
+ copy.factTimeStamp = factTimeStamp;
+ copy.segmentId = segmentId;
+ copy.serializationNullFormat = serializationNullFormat;
+ copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+ copy.badRecordsAction = badRecordsAction;
+ copy.escapeChar = escapeChar;
+ copy.quoteChar = quoteChar;
+ copy.commentChar = commentChar;
+ copy.maxColumns = maxColumns;
+ copy.storePath = storePath;
+ return copy;
+ }
+
+ /**
+ * get CarbonLoadModel with partition
+ *
+ * @param uniqueId
+ * @param filesForPartition
+ * @param header
+ * @param delimiter
+ * @return
+ */
+ public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
+ String header, String delimiter) {
+ CarbonLoadModel copyObj = new CarbonLoadModel();
+ copyObj.tableName = tableName;
+ copyObj.dimFolderPath = dimFolderPath;
+ copyObj.factFilePath = null;
+ copyObj.databaseName = databaseName;
+ copyObj.partitionId = uniqueId;
+ copyObj.aggTables = aggTables;
+ copyObj.aggTableName = aggTableName;
+ copyObj.aggLoadRequest = aggLoadRequest;
+ copyObj.loadMetadataDetails = loadMetadataDetails;
+ copyObj.isRetentionRequest = isRetentionRequest;
+ copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
+ copyObj.csvHeader = header;
+ copyObj.factFilesToProcess = filesForPartition;
+ copyObj.isDirectLoad = true;
+ copyObj.csvDelimiter = delimiter;
+ copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
+ copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
+ copyObj.blocksID = blocksID;
+ copyObj.taskNo = taskNo;
+ copyObj.factTimeStamp = factTimeStamp;
+ copyObj.segmentId = segmentId;
+ copyObj.serializationNullFormat = serializationNullFormat;
+ copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
+ copyObj.badRecordsAction = badRecordsAction;
+ copyObj.escapeChar = escapeChar;
+ copyObj.quoteChar = quoteChar;
+ copyObj.commentChar = commentChar;
+ copyObj.dateFormat = dateFormat;
+ copyObj.maxColumns = maxColumns;
+ copyObj.storePath = storePath;
+ return copyObj;
+ }
+
+ /**
+ * @return the partitionId
+ */
+ public String getPartitionId() {
+ return partitionId;
+ }
+
+ /**
+ * @param partitionId the partitionId to set
+ */
+ public void setPartitionId(String partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * @return the aggTables
+ */
+ public String[] getAggTables() {
+ return aggTables;
+ }
+
+ /**
+ * @param aggTables the aggTables to set
+ */
+ public void setAggTables(String[] aggTables) {
+ this.aggTables = aggTables;
+ }
+
+ /**
+ * @return the aggLoadRequest
+ */
+ public boolean isAggLoadRequest() {
+ return aggLoadRequest;
+ }
+
+ /**
+ * @param aggLoadRequest the aggLoadRequest to set
+ */
+ public void setAggLoadRequest(boolean aggLoadRequest) {
+ this.aggLoadRequest = aggLoadRequest;
+ }
+
+ /**
+ * @param storePath The storePath to set.
+ */
+ public void setStorePath(String storePath) {
+ this.storePath = storePath;
+ }
+
+ /**
+ * @return Returns the aggTableName.
+ */
+ public String getAggTableName() {
+ return aggTableName;
+ }
+
+ /**
+ * @return Returns the factStoreLocation.
+ */
+ public String getStorePath() {
+ return storePath;
+ }
+
+ /**
+ * @param aggTableName The aggTableName to set.
+ */
+ public void setAggTableName(String aggTableName) {
+ this.aggTableName = aggTableName;
+ }
+
+ /**
+ * isRetentionRequest
+ *
+ * @return
+ */
+ public boolean isRetentionRequest() {
+ return isRetentionRequest;
+ }
+
+ /**
+ * @param isRetentionRequest
+ */
+ public void setRetentionRequest(boolean isRetentionRequest) {
+ this.isRetentionRequest = isRetentionRequest;
+ }
+
+ /**
+ * getLoadMetadataDetails.
+ *
+ * @return
+ */
+ public List<LoadMetadataDetails> getLoadMetadataDetails() {
+ return loadMetadataDetails;
+ }
+
+ /**
+ * setLoadMetadataDetails.
+ *
+ * @param loadMetadataDetails
+ */
+ public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
+ this.loadMetadataDetails = loadMetadataDetails;
+ }
+
+ /**
+ * @return
+ */
+ public String getTaskNo() {
+ return taskNo;
+ }
+
+ /**
+ * @param taskNo
+ */
+ public void setTaskNo(String taskNo) {
+ this.taskNo = taskNo;
+ }
+
+ /**
+ * @return
+ */
+ public String getFactTimeStamp() {
+ return factTimeStamp;
+ }
+
+ /**
+ * @param factTimeStamp
+ */
+ public void setFactTimeStamp(String factTimeStamp) {
+ this.factTimeStamp = factTimeStamp;
+ }
+
+ public String[] getDelimiters() {
+ return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
+ }
+
+ /**
+ * @return load Id
+ */
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ /**
+ * @param segmentId
+ */
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
+
+ /**
+ * the method returns the value to be treated as null while data load
+ * @return
+ */
+ public String getSerializationNullFormat() {
+ return serializationNullFormat;
+ }
+
+ /**
+ * the method sets the value to be treated as null while data load
+ * @param serializationNullFormat
+ */
+ public void setSerializationNullFormat(String serializationNullFormat) {
+ this.serializationNullFormat = serializationNullFormat;
+ }
+
+ /**
+ * returns the string to enable bad record logger
+ * @return
+ */
+ public String getBadRecordsLoggerEnable() {
+ return badRecordsLoggerEnable;
+ }
+
+ /**
+ * method sets the string to specify whether to enable or dissable the badrecord logger.
+ * @param badRecordsLoggerEnable
+ */
+ public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
+ this.badRecordsLoggerEnable = badRecordsLoggerEnable;
+ }
+
+ public String getQuoteChar() {
+ return quoteChar;
+ }
+
+ public void setQuoteChar(String quoteChar) {
+ this.quoteChar = quoteChar;
+ }
+
+ public String getCommentChar() {
+ return commentChar;
+ }
+
+ public void setCommentChar(String commentChar) {
+ this.commentChar = commentChar;
+ }
+
+ public String getDateFormat() { return dateFormat; }
+
+ public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
+
+ /**
+ * @return
+ */
+ public String getMaxColumns() {
+ return maxColumns;
+ }
+
+ /**
+ * @param maxColumns
+ */
+ public void setMaxColumns(String maxColumns) {
+ this.maxColumns = maxColumns;
+ }
+
+ /**
+ * returns option to specify the bad record logger action
+ * @return
+ */
+ public String getBadRecordsAction() {
+ return badRecordsAction;
+ }
+
+ /**
+ * set option to specify the bad record logger action
+ * @param badRecordsAction
+ */
+ public void setBadRecordsAction(String badRecordsAction) {
+ this.badRecordsAction = badRecordsAction;
+ }
+
+ public String getRddIteratorKey() {
+ return rddIteratorKey;
+ }
+
+ public void setRddIteratorKey(String rddIteratorKey) {
+ this.rddIteratorKey = rddIteratorKey;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
index 3e6d63e..c5dc5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
@@ -30,10 +30,16 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
*/
public class DataField implements Serializable {
+ public DataField(CarbonColumn column) {
+ this.column = column;
+ }
+
private CarbonColumn column;
private CompressionCodec compressionCodec;
+ private String dateFormat;
+
public boolean hasDictionaryEncoding() {
return column.hasEncoding(Encoding.DICTIONARY);
}
@@ -42,10 +48,6 @@ public class DataField implements Serializable {
return column;
}
- public void setColumn(CarbonColumn column) {
- this.column = column;
- }
-
public CompressionCodec getCompressionCodec() {
return compressionCodec;
}
@@ -54,4 +56,11 @@ public class DataField implements Serializable {
this.compressionCodec = compressionCodec;
}
+ public String getDateFormat() {
+ return dateFormat;
+ }
+
+ public void setDateFormat(String dateFormat) {
+ this.dateFormat = dateFormat;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
new file mode 100644
index 0000000..746e0f2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+
+/**
+ * It executes the data load.
+ */
+public class DataLoadExecutor {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
+
+ public void execute(CarbonLoadModel loadModel, String storeLocation,
+ Iterator<Object[]>[] inputIterators) throws Exception {
+ AbstractDataLoadProcessorStep loadProcessorStep = null;
+ try {
+
+ loadProcessorStep =
+ new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
+ // 1. initialize
+ loadProcessorStep.initialize();
+ LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
+ // 2. execute the step
+ loadProcessorStep.execute();
+ } catch (CarbonDataLoadingException e) {
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
+ throw new CarbonDataLoadingException(
+ "Data Loading failed for table " + loadModel.getTableName(), e);
+ } finally {
+ if (loadProcessorStep != null) {
+ // 3. Close the step
+ loadProcessorStep.close();
+ }
+ }
+
+ String key =
+ new CarbonTableIdentifier(loadModel.getDatabaseName(), loadModel.getTableName(), null)
+ .getBadRecordLoggerKey();
+ if (null != BadRecordsLogger.hasBadRecord(key)) {
+ LOGGER.error("Data Load is partcially success for table " + loadModel.getTableName());
+ throw new BadRecordFoundException("Bad records found during load");
+ } else {
+ LOGGER.info("Data loading is successful for table "+loadModel.getTableName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
new file mode 100644
index 0000000..92c677c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It builds the pipe line of steps for loading data to carbon.
+ */
+public final class DataLoadProcessBuilder {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
+
+ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
+ Iterator[] inputIterators) throws Exception {
+ CarbonDataLoadConfiguration configuration =
+ createConfiguration(loadModel, storeLocation);
+ // 1. Reads the data input iterators and parses the data.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepImpl(configuration, inputIterators);
+ // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+ // data types and configurations.
+ AbstractDataLoadProcessorStep converterProcessorStep =
+ new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+ // 3. Sorts the data which are part of key (all dimensions except complex types)
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, converterProcessorStep);
+ // 4. Writes the sorted data in carbondata format.
+ AbstractDataLoadProcessorStep writerProcessorStep =
+ new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+ return writerProcessorStep;
+ }
+
+ private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
+ String storeLocation) throws Exception {
+ if (!new File(storeLocation).mkdirs()) {
+ LOGGER.error("Error while creating the temp store path: " + storeLocation);
+ }
+ CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+ configuration.setTableIdentifier(identifier);
+ String csvHeader = loadModel.getCsvHeader();
+ String csvFileName = null;
+ if (csvHeader != null && !csvHeader.isEmpty()) {
+ configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, ","));
+ } else {
+ CarbonFile csvFile =
+ CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
+ csvFileName = csvFile.getName();
+ csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+ configuration.setHeader(
+ CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter()));
+ }
+ if (!CarbonDataProcessorUtil
+ .isHeaderValid(loadModel.getTableName(), csvHeader, loadModel.getCarbonDataLoadSchema(),
+ loadModel.getCsvDelimiter())) {
+ if (csvFileName == null) {
+ LOGGER.error("CSV header provided in DDL is not proper."
+ + " Column names in schema and CSV header are not the same.");
+ throw new CarbonDataLoadingException(
+ "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
+ + "not the same.");
+ } else {
+ LOGGER.error(
+ "CSV File provided is not proper. Column names in schema and csv header are not same. "
+ + "CSVFile Name : " + csvFileName);
+ throw new CarbonDataLoadingException(
+ "CSV File provided is not proper. Column names in schema and csv header are not same. "
+ + "CSVFile Name : " + csvFileName);
+ }
+ }
+
+ configuration.setPartitionId(loadModel.getPartitionId());
+ configuration.setSegmentId(loadModel.getSegmentId());
+ configuration.setTaskNo(loadModel.getTaskNo());
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+ new String[] { loadModel.getComplexDelimiterLevel1(),
+ loadModel.getComplexDelimiterLevel2() });
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+ loadModel.getSerializationNullFormat().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
+ loadModel.getFactTimeStamp());
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
+ loadModel.getBadRecordsLoggerEnable().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
+ loadModel.getBadRecordsAction().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
+ loadModel.getFactFilePath());
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ Map<String, String> dateFormatMap =
+ CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
+ List<DataField> dataFields = new ArrayList<>();
+ List<DataField> complexDataFields = new ArrayList<>();
+
+ // First add dictionary and non dictionary dimensions because these are part of mdk key.
+ // And then add complex data types and measures.
+ for (CarbonColumn column : dimensions) {
+ DataField dataField = new DataField(column);
+ dataField.setDateFormat(dateFormatMap.get(column.getColName()));
+ if (column.isComplex()) {
+ complexDataFields.add(dataField);
+ } else {
+ dataFields.add(dataField);
+ }
+ }
+ dataFields.addAll(complexDataFields);
+ for (CarbonColumn column : measures) {
+ // This dummy measure is added when no measure was present. We no need to load it.
+ if (!(column.getColName().equals("default_dummy_measure"))) {
+ dataFields.add(new DataField(column));
+ }
+ }
+ configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+ return configuration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
index 15f5b0e..958fd1a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -34,4 +34,12 @@ public final class DataLoadProcessorConstants {
public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
+ public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
+
+ public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
+
+ public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
+
+ public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
new file mode 100644
index 0000000..2455392
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter;
+
+/**
+ * It is holder for reason of bad records.
+ */
+public class BadRecordLogHolder {
+
+ private String reason;
+
+ private boolean badRecordAdded;
+
+ public String getReason() {
+ return reason;
+ }
+
+ public void setReason(String reason) {
+ this.reason = reason;
+ badRecordAdded = true;
+ }
+
+ public boolean isBadRecordNotAdded() {
+ return badRecordAdded;
+ }
+
+ public void clear() {
+ this.badRecordAdded = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
index e304fbc..8dda65d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
@@ -30,7 +30,9 @@ public interface FieldConverter {
/**
* It converts the column field and updates the data in same location/index in row.
* @param row
+ * @return the status whether it could be loaded or not, usually when record is added
+ * to bad records then it returns false.
* @throws CarbonDataLoadingException
*/
- void convert(CarbonRow row) throws CarbonDataLoadingException;
+ void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
index 44f1116..3b199ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
@@ -26,7 +26,11 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
*/
public interface RowConverter {
+ void initialize();
+
CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
+ RowConverter createCopyForNewThread();
+
void finish();
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
index 70a900c..790a970 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
@@ -18,10 +18,12 @@
*/
package org.apache.carbondata.processing.newflow.converter.impl;
+import java.util.List;
+
import org.apache.carbondata.processing.newflow.converter.FieldConverter;
public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
- public abstract int getColumnCardinality();
+ public abstract void fillColumnCardinality(List<Integer> cardinality);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index 4c18aa7..4b7aa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -19,12 +19,42 @@
package org.apache.carbondata.processing.newflow.converter.impl;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
-public class ComplexFieldConverterImpl implements FieldConverter {
+public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+ private GenericDataType genericDataType;
+
+ private int index;
+
+ public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
+ this.genericDataType = genericDataType;
+ this.index = index;
+ }
@Override
- public void convert(CarbonRow row) {
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+ Object object = row.getObject(index);
+ // TODO Its temporary, needs refactor here.
+ ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+ try {
+ genericDataType.writeByteArray(object, dataOutputStream);
+ dataOutputStream.close();
+ row.update(byteArray.toByteArray(), index);
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(object+"", e);
+ }
+ }
+
+ @Override public void fillColumnCardinality(List<Integer> cardinality) {
+ genericDataType.fillCardinality(cardinality);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 8ca4ff2..3182a37 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -19,16 +19,22 @@
package org.apache.carbondata.processing.newflow.converter.impl;
+import java.util.List;
+
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -38,14 +44,20 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
- private BiDictionary<Integer, String> dictionaryGenerator;
+ private BiDictionary<Integer, Object> dictionaryGenerator;
private int index;
+ private CarbonDimension carbonDimension;
+
+ private String nullFormat;
+
public DictionaryFieldConverterImpl(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, int index) {
+ CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index) {
this.index = index;
+ this.carbonDimension = (CarbonDimension) dataField.getColumn();
+ this.nullFormat = nullFormat;
DictionaryColumnUniqueIdentifier identifier =
new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
@@ -58,17 +70,22 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
}
}
- @Override
- public void convert(CarbonRow row) throws CarbonDataLoadingException {
+ @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+ throws CarbonDataLoadingException {
try {
- row.update(dictionaryGenerator.getOrGenerateKey(row.getString(index)), index);
+ String parsedValue = DataTypeUtil.parseValue(row.getString(index), carbonDimension);
+ if(null == parsedValue || parsedValue.equals(nullFormat)) {
+ row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
+ } else {
+ row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
+ }
} catch (DictionaryGenerationException e) {
throw new CarbonDataLoadingException(e);
}
}
@Override
- public int getColumnCardinality() {
- return dictionaryGenerator.size();
+ public void fillColumnCardinality(List<Integer> cardinality) {
+ cardinality.add(dictionaryGenerator.size());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
index 8ff110a..c8113ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -19,9 +19,13 @@
package org.apache.carbondata.processing.newflow.converter.impl;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
@@ -30,21 +34,50 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
private int index;
- public DirectDictionaryFieldConverterImpl(DataField dataField, int index) {
- DirectDictionaryGenerator directDictionaryGenerator =
- DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
- this.directDictionaryGenerator = directDictionaryGenerator;
+ private String nullFormat;
+
+ private CarbonColumn column;
+
+ public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index) {
+ this.nullFormat = nullFormat;
+ this.column = dataField.getColumn();
+ if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
+ this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
+ dataField.getDateFormat());
+
+ } else {
+ this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
+ }
this.index = index;
}
@Override
- public void convert(CarbonRow row) {
- row.update(directDictionaryGenerator.generateDirectSurrogateKey(row.getString(index)), index);
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+ String value = row.getString(index);
+ if (value == null) {
+ logHolder.setReason(
+ "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+ .getColName() + " and column data type " + column.getDataType() + " is not a valid "
+ + column.getDataType() + " type.");
+ row.update(1, index);
+ } else if (value.equals(nullFormat)) {
+ row.update(1, index);
+ } else {
+ int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
+ if (key == 1) {
+ logHolder.setReason(
+ "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+ .getColName() + " and column data type " + column.getDataType() + " is not a valid "
+ + column.getDataType() + " type.");
+ }
+ row.update(key, index);
+ }
}
@Override
- public int getColumnCardinality() {
- return Integer.MAX_VALUE;
+ public void fillColumnCardinality(List<Integer> cardinality) {
+ cardinality.add(Integer.MAX_VALUE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index a10ad20..a46b9ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -18,11 +18,19 @@
*/
package org.apache.carbondata.processing.newflow.converter.impl;
+import java.util.List;
+
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
+import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.newflow.DataField;
import org.apache.carbondata.processing.newflow.converter.FieldConverter;
@@ -43,27 +51,83 @@ public class FieldEncoderFactory {
/**
* Creates the FieldConverter for all dimensions, for measures return null.
- * @param dataField column schema
- * @param cache dicionary cache.
+ *
+ * @param dataField column schema
+ * @param cache dicionary cache.
* @param carbonTableIdentifier table identifier
- * @param index index of column in the row.
+ * @param index index of column in the row.
* @return
*/
public FieldConverter createFieldEncoder(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier, int index) {
+ CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat) {
// Converters are only needed for dimensions and measures it return null.
if (dataField.getColumn().isDimesion()) {
- if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY)) {
- return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, index);
- } else if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- return new DirectDictionaryFieldConverterImpl(dataField, index);
+ if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+ !dataField.getColumn().isComplex()) {
+ return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index);
+ } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
+ !dataField.getColumn().isComplex()) {
+ return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
+ index);
} else if (dataField.getColumn().isComplex()) {
- return new ComplexFieldConverterImpl();
+ return new ComplexFieldConverterImpl(
+ createComplexType(dataField, cache, carbonTableIdentifier), index);
} else {
- return new NonDictionaryFieldConverterImpl(dataField, index);
+ return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
}
+ } else {
+ return new MeasureFieldConverterImpl(dataField, nullFormat, index);
+ }
+ }
+
+ /**
+ * Create parser for the carbon column.
+ */
+ private static GenericDataType createComplexType(DataField dataField,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier) {
+ return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
+ carbonTableIdentifier);
+ }
+
+ /**
+ * This method may be called recursively if the carbon column is complex type.
+ *
+ * @return GenericDataType
+ */
+ private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier) {
+ switch (carbonColumn.getDataType()) {
+ case ARRAY:
+ List<CarbonDimension> listOfChildDimensions =
+ ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ // Create array parser with complex delimiter
+ ArrayDataType arrayDataType =
+ new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+ for (CarbonDimension dimension : listOfChildDimensions) {
+ arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+ carbonTableIdentifier));
+ }
+ return arrayDataType;
+ case STRUCT:
+ List<CarbonDimension> dimensions =
+ ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ // Create struct parser with complex delimiter
+ StructDataType structDataType =
+ new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+ for (CarbonDimension dimension : dimensions) {
+ structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+ carbonTableIdentifier));
+ }
+ return structDataType;
+ case MAP:
+ throw new UnsupportedOperationException("Complex type Map is not supported yet");
+ default:
+ return new PrimitiveDataType(carbonColumn.getColName(), parentName,
+ carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+ carbonTableIdentifier);
}
- return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
new file mode 100644
index 0000000..c419d46
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * Converter for measure
+ */
+public class MeasureFieldConverterImpl implements FieldConverter {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
+
+ private int index;
+
+ private DataType dataType;
+
+ private CarbonMeasure measure;
+
+ private String nullformat;
+
+ public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index) {
+ this.dataType = dataField.getColumn().getDataType();
+ this.measure = (CarbonMeasure) dataField.getColumn();
+ this.nullformat = nullformat;
+ this.index = index;
+ }
+
+ @Override
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+ throws CarbonDataLoadingException {
+ String value = row.getString(index);
+ Object output;
+ if (value == null) {
+ logHolder.setReason(
+ "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+ + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+ row.update(null, index);
+ } else if(value.equals(nullformat)) {
+ row.update(null, index);
+ } else {
+ try {
+ output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+ row.update(output, index);
+ } catch (NumberFormatException e) {
+ LOGGER.warn(
+ "Cant not convert : " + value + " to Numeric type value. Value considered as null.");
+ logHolder.setReason(
+ "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+ + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+ output = null;
+ row.update(output, index);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 9540907..c90f1ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -21,9 +21,11 @@ package org.apache.carbondata.processing.newflow.converter.impl;
import java.nio.charset.Charset;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
import org.apache.carbondata.processing.newflow.converter.FieldConverter;
import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -33,17 +35,29 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
private int index;
- public NonDictionaryFieldConverterImpl(DataField dataField, int index) {
+ private String nullformat;
+
+ private CarbonColumn column;
+
+ public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index) {
this.dataType = dataField.getColumn().getDataType();
+ this.column = dataField.getColumn();
this.index = index;
+ this.nullformat = nullformat;
}
@Override
- public void convert(CarbonRow row) {
+ public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
String dimensionValue = row.getString(index);
+ if (dimensionValue == null || dimensionValue.equals(nullformat)) {
+ dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+ }
if (dataType != DataType.STRING) {
if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
- dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+ logHolder.setReason(
+ "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
+ .getColName() + " and column data type " + dataType + " is not a valid " + dataType
+ + " type.");
}
}
row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),