You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/20 10:13:49 UTC
[21/50] [abbrv] incubator-carbondata git commit: CARBONDATA-5 data
mismatch between the carbon Table and Hive Table for columns having \N for
non numeric data type (#802)
CARBONDATA-5 data mismatch between the carbon Table and Hive Table for columns having \N for non numeric data type (#802)
LGTM
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a45dc4f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a45dc4f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a45dc4f7
Branch: refs/heads/master
Commit: a45dc4f7bc9d67b29ee18c6ee7d369b338565e21
Parents: a044680
Author: Mohammad Shahid Khan <mo...@gmail.com>
Authored: Sun Jul 17 00:44:23 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Sun Jul 17 00:44:23 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoadModel.java | 23 ++++
.../carbondata/spark/load/CarbonLoaderUtil.java | 1 +
.../org/apache/spark/sql/CarbonSqlParser.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 3 +
.../test/resources/nullvalueserialization.csv | 2 +
.../TestNullValueSerialization.scala | 112 +++++++++++++++++++
.../processing/api/dataloader/SchemaInfo.java | 20 ++++
.../graphgenerator/GraphGenerator.java | 14 +++
.../configuration/GraphConfigurationInfo.java | 23 ++++
.../processing/schema/metadata/TableOption.java | 82 ++++++++++++++
.../schema/metadata/TableOptionWrapper.java | 106 ++++++++++++++++++
.../csvbased/CarbonCSVBasedSeqGenMeta.java | 39 ++++++-
.../csvbased/CarbonCSVBasedSeqGenStep.java | 14 ++-
13 files changed, 435 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
index 2ae2a06..098c863 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
@@ -95,6 +95,11 @@ public class CarbonLoadModel implements Serializable {
private String escapeChar;
/**
+ * defines the string that should be treated as null while loadind data
+ */
+ private String serializationNullFormat;
+
+ /**
* get escape char
* @return
*/
@@ -301,6 +306,7 @@ public class CarbonLoadModel implements Serializable {
copy.taskNo = taskNo;
copy.factTimeStamp = factTimeStamp;
copy.segmentId = segmentId;
+ copy.serializationNullFormat = serializationNullFormat;
copy.escapeChar = escapeChar;
return copy;
}
@@ -338,6 +344,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.taskNo = taskNo;
copyObj.factTimeStamp = factTimeStamp;
copyObj.segmentId = segmentId;
+ copyObj.serializationNullFormat = serializationNullFormat;
copyObj.escapeChar = escapeChar;
return copyObj;
}
@@ -491,4 +498,20 @@ public class CarbonLoadModel implements Serializable {
public void setSegmentId(String segmentId) {
this.segmentId = segmentId;
}
+
+ /**
+ * the method returns the value to be treated as null while data load
+ * @return
+ */
+ public String getSerializationNullFormat() {
+ return serializationNullFormat;
+ }
+
+ /**
+ * the method sets the value to be treated as null while data load
+ * @param serializationNullFormat
+ */
+ public void setSerializationNullFormat(String serializationNullFormat) {
+ this.serializationNullFormat = serializationNullFormat;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index a9378d2..8bb8598 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -169,6 +169,7 @@ public final class CarbonLoaderUtil {
info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
info.setComplexDelimiterLevel1(loadModel.getComplexDelimiterLevel1());
info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
+ info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
generateGraph(schmaModel, info, currentRestructNumber, loadModel, outPutLoc);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index f2e5283..2533321 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -81,6 +81,7 @@ class CarbonSqlParser()
protected val FACT = Keyword("FACT")
protected val FIELDS = Keyword("FIELDS")
protected val FILEHEADER = Keyword("FILEHEADER")
+ protected val SERIALIZATION_NULL_FORMAT = Keyword("SERIALIZATION_NULL_FORMAT")
protected val FILES = Keyword("FILES")
protected val FROM = Keyword("FROM")
protected val HIERARCHIES = Keyword("HIERARCHIES")
@@ -951,7 +952,8 @@ class CarbonSqlParser()
// validate with all supported options
val options = partionDataOptions.get.groupBy(x => x._1)
val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
- "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT"
+ "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
+ "SERIALIZATION_NULL_FORMAT"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 040e3b6..3da0ff1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1531,6 +1531,7 @@ private[sql] case class LoadCube(
val fileHeader = partionValues.getOrElse("fileheader", "")
val escapeChar = partionValues.getOrElse("escapechar", "\\")
val columnDict = partionValues.getOrElse("columndict", null)
+ val serializationNullFormat = partionValues.getOrElse("serialization_null_format", "\\N")
val complex_delimiter_level_1 = partionValues.getOrElse("complex_delimiter_level_1", "\\$")
val complex_delimiter_level_2 = partionValues.getOrElse("complex_delimiter_level_2", "\\:")
val multiLine = partionValues.getOrElse("multiline", "false").trim.toLowerCase match {
@@ -1543,6 +1544,8 @@ private[sql] case class LoadCube(
}
carbonLoadModel.setEscapeChar(escapeChar)
+ carbonLoadModel.setSerializationNullFormat("serialization_null_format" + "," +
+ serializationNullFormat)
if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/test/resources/nullvalueserialization.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/nullvalueserialization.csv b/integration/spark/src/test/resources/nullvalueserialization.csv
new file mode 100644
index 0000000..9d1dba8
--- /dev/null
+++ b/integration/spark/src/test/resources/nullvalueserialization.csv
@@ -0,0 +1,2 @@
+1,2015-17-23 00:00:00,china,aaa1,phone197,ASD69643,15000.43525
+2,\N,\N,\N,\N,\N,\N
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
new file mode 100644
index 0000000..d68903d
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/nullvalueserialization/TestNullValueSerialization.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.spark.testsuite.nullvalueserialization
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test cases for testing columns having \N or \null values for non numeric columns
+ */
+class TestNullValueSerialization extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("drop table if exists carbonTable")
+ sql("drop table if exists hiveTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ )
+ val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+ .getCanonicalPath
+ val csvFilePath = currentDirectory + "/src/test/resources/nullvalueserialization.csv"
+ sql(
+ "CREATE TABLE IF NOT EXISTS carbonTable (ID String, date Timestamp, country String, name " +
+ "String, phonetype String, serialname String, salary Decimal(17,2))STORED BY 'org.apache" +
+ ".carbondata.format'"
+ )
+ sql(
+ "create table if not exists hiveTable(ID String, date Timestamp, country String, name " +
+ "String, " +
+ "phonetype String, serialname String, salary Decimal(17,2))row format delimited fields " +
+ "terminated by ','"
+ )
+ sql(
+ "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table carbonTable OPTIONS " +
+ "('FILEHEADER'='ID,date," +
+ "country,name,phonetype,serialname,salary')"
+ )
+ sql(
+ "LOAD DATA local inpath '" + csvFilePath + "' INTO table hiveTable"
+ )
+ }
+
+
+ test("test detail query on column having null values") {
+ System.out.println("Carbon Table")
+ sql("select * from carbonTable").show()
+ System.out.println("Hive Table")
+ sql("select * from hiveTable").show()
+ checkAnswer(
+ sql("select * from carbonTable"),
+ sql("select * from hiveTable")
+ )
+ }
+
+ test("test filter query on column is null") {
+ checkAnswer(
+ sql("select * from carbonTable where salary is null"),
+ sql("select * from hiveTable where salary is null")
+ )
+ }
+
+ test("test filter query on column is not null") {
+ checkAnswer(
+ sql("select * from carbonTable where salary is not null"),
+ sql("select * from hiveTable where salary is not null")
+ )
+ }
+
+ test("test filter query on columnValue=null") {
+ checkAnswer(
+ sql("select * from carbonTable where salary=null"),
+ sql("select * from hiveTable where salary=null")
+ )
+ }
+
+ test("test filter query where date is null") {
+ checkAnswer(
+ sql("select * from carbonTable where date is null"),
+ sql("select * from hiveTable where date is null")
+ )
+ }
+
+ override def afterAll {
+ sql("drop table if exists carbonTable")
+ sql("drop table if exists hiveTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java b/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
index 3f6b8a0..c52e3a0 100644
--- a/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/api/dataloader/SchemaInfo.java
@@ -59,6 +59,10 @@ public class SchemaInfo {
private String complexDelimiterLevel1;
private String complexDelimiterLevel2;
+ /**
+ * the value to be treated as null while data load
+ */
+ private String serializationNullFormat;
public String getComplexDelimiterLevel1() {
return complexDelimiterLevel1;
@@ -168,4 +172,20 @@ public class SchemaInfo {
this.schemaName = schemaName;
}
+ /**
+ * the method returns the value to be treated as null while data load
+ * @return
+ */
+ public String getSerializationNullFormat() {
+ return serializationNullFormat;
+ }
+
+ /**
+ * the method sets the value to be treated as null while data load
+ * @param serializationNullFormat
+ */
+ public void setSerializationNullFormat(String serializationNullFormat) {
+ this.serializationNullFormat = serializationNullFormat;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index 92c00b5..ef846fe 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -46,6 +46,7 @@ import org.carbondata.processing.csvreaderstep.CsvInputMeta;
import org.carbondata.processing.graphgenerator.configuration.GraphConfigurationInfo;
import org.carbondata.processing.mdkeygen.MDKeyGenStepMeta;
import org.carbondata.processing.merger.step.CarbonSliceMergerStepMeta;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
import org.carbondata.processing.sortandgroupby.sortdatastep.SortKeyStepMeta;
import org.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedSeqGenMeta;
import org.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -567,6 +568,7 @@ public class GraphGenerator {
seqMeta.setNormHiers(graphConfiguration.getNormHiers());
seqMeta.setHeirKeySize(graphConfiguration.getHeirAndKeySizeString());
seqMeta.setColumnSchemaDetails(graphConfiguration.getColumnSchemaDetails().toString());
+ seqMeta.setTableOption(graphConfiguration.getTableOptionWrapper().toString());
String[] aggType = graphConfiguration.getAggType();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < aggType.length; i++) {
@@ -797,6 +799,7 @@ public class GraphGenerator {
prepareNoDictionaryMapping(dimensions, graphConfiguration);
graphConfiguration
.setColumnSchemaDetails(CarbonSchemaParser.getColumnSchemaDetails(dimensions));
+ graphConfiguration.setTableOptionWrapper(getTableOptionWrapper());
String factTableName = carbonDataLoadSchema.getCarbonTable().getFactTableName();
graphConfiguration.setTableName(factTableName);
StringBuilder dimString = new StringBuilder();
@@ -901,6 +904,17 @@ public class GraphGenerator {
return graphConfiguration;
}
+ /**
+ * the method returns the table option wrapper
+ *
+ * @return
+ */
+ private TableOptionWrapper getTableOptionWrapper() {
+ TableOptionWrapper tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance();
+ tableOptionWrapper.setTableOption(schemaInfo.getSerializationNullFormat());
+ return tableOptionWrapper;
+ }
+
private String getQuoteType(SchemaInfo schemaInfo) throws GraphGeneratorException {
String driverClass = schemaInfo.getSrcDriverName();
String type = DRIVERS.get(driverClass);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
index 1b2afa5..c4406ab 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/configuration/GraphConfigurationInfo.java
@@ -22,6 +22,7 @@ package org.carbondata.processing.graphgenerator.configuration;
import java.util.Map;
import org.carbondata.processing.schema.metadata.ColumnSchemaDetailsWrapper;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
public class GraphConfigurationInfo {
private String connectionName;
@@ -200,6 +201,11 @@ public class GraphConfigurationInfo {
private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
/**
+ * wrapper object holding the table options details needed while dataload
+ */
+ private TableOptionWrapper tableOptionWrapper;
+
+ /**
* It is column groups in below format
* 0,1~2~3,4,5,6~7~8,9
* groups are
@@ -998,6 +1004,23 @@ public class GraphConfigurationInfo {
return columnSchemaDetailsWrapper;
}
+ /**
+ * set wraper object having table options needed while dataload
+ *
+ * @return
+ */
+ public void setTableOptionWrapper(TableOptionWrapper tableOptionWrapper) {
+ this.tableOptionWrapper = tableOptionWrapper;
+ }
+
+ /**
+ * method returns the table options detail wrapper instance.
+ * @return
+ */
+ public TableOptionWrapper getTableOptionWrapper() {
+ return tableOptionWrapper;
+ }
+
public void setColumnGroupsString(String columnGroups) {
this.columnGroupsString = columnGroups;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
new file mode 100644
index 0000000..b487c72
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOption.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.processing.schema.metadata;
+
+/**
+ * This class is to hold the key value pair of properties needed while dataload.
+ */
+public class TableOption {
+ /**
+ * option key name
+ */
+ private String optionKey;
+ /**
+ * option key value
+ */
+ private String optionValue;
+
+ /**
+ * the constructor to initialize the key value pair TableOption instance
+ *
+ * @param optionKey
+ * @param optionValue
+ */
+ public TableOption(String optionKey, String optionValue) {
+ this.optionKey = optionKey;
+ this.optionValue = optionValue;
+ }
+
+ /**
+ * constructor to init from te string separated by comma(,)
+ *
+ * @param str
+ */
+ public TableOption(String str) {
+ //passing 2 to split the key value pair having empty value for the corresponding key.
+ String[] split = str.split(",", 2);
+ this.optionKey = split[0];
+ this.optionValue = split[1];
+ }
+
+ /**
+ * returns options key
+ *
+ * @return
+ */
+ public String getOptionKey() {
+ return optionKey;
+ }
+
+ /**
+ * returns options value
+ *
+ * @return
+ */
+ public String getOptionValue() {
+ return optionValue;
+ }
+
+ /**
+ * @return
+ */
+ public String toString() {
+ return optionKey + "," + optionValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
new file mode 100644
index 0000000..adb077c
--- /dev/null
+++ b/processing/src/main/java/org/carbondata/processing/schema/metadata/TableOptionWrapper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.processing.schema.metadata;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * The class hold the table option details being used while dataload
+ */
+public class TableOptionWrapper {
+ /**
+ * map holds the table options
+ */
+ private static final Map<String, TableOption> mapOFOptions =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ private static TableOptionWrapper tableOptionWrapper = new TableOptionWrapper();
+
+ /**
+ * to initialize the wrapper object
+ */
+ private TableOptionWrapper() {
+ }
+
+ /**
+ * @param input
+ */
+ public static void populateTableOptions(String input) {
+ String[] split =
+ null != input ? input.split(CarbonCommonConstants.HASH_SPC_CHARACTER) : new String[0];
+ for (String str : split) {
+ TableOption tableOption = new TableOption(str);
+ mapOFOptions.put(tableOption.getOptionKey(), tableOption);
+ }
+ }
+
+ /**
+ * @param input
+ */
+ public static void setTableOption(String input) {
+ if (null != input) {
+ TableOption tableOption = new TableOption(input);
+ mapOFOptions.put(tableOption.getOptionKey(), tableOption);
+ }
+ }
+
+ /**
+ * returns TableOptionWrapper instance
+ *
+ * @return
+ */
+ public static TableOptionWrapper getTableOptionWrapperInstance() {
+ return tableOptionWrapper;
+ }
+
+ /**
+ * returns the options key value
+ * return null if the key is not found in the map
+ *
+ * @param key
+ * @return
+ */
+ public String get(String key) {
+ TableOption tableOption = mapOFOptions.get(key);
+ return null != tableOption ? tableOption.getOptionValue() : null;
+ }
+
+ /**
+ * return the string object
+ *
+ * @return
+ */
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ Set<Map.Entry<String, TableOption>> entries = mapOFOptions.entrySet();
+ Iterator<Map.Entry<String, TableOption>> iterator = entries.iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, TableOption> entry = iterator.next();
+ builder.append(entry.getValue().toString());
+ builder.append(CarbonCommonConstants.HASH_SPC_CHARACTER);
+ }
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
index 77f7297..dad2bbc 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenMeta.java
@@ -38,6 +38,7 @@ import org.carbondata.processing.datatypes.PrimitiveDataType;
import org.carbondata.processing.datatypes.StructDataType;
import org.carbondata.processing.schema.metadata.ColumnSchemaDetailsWrapper;
import org.carbondata.processing.schema.metadata.HierarchiesInfo;
+import org.carbondata.processing.schema.metadata.TableOptionWrapper;
import org.carbondata.processing.util.CarbonDataProcessorUtil;
import org.carbondata.processing.util.RemoveDictionaryUtil;
@@ -363,14 +364,24 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
private String segmentId;
/***
- * String of columns ordinal and column datatype separated by COLON_SPC_CHARACTER
+ * String of columns ordinal and column datatype separated by HASH_SPC_CHARACTER
*/
private String columnSchemaDetails;
/**
+ * String of key value pair separated by , and HASH_SPC_CHARACTER
+ */
+ private String tableOption;
+
+ /**
* wrapper object having the columnSchemaDetails
*/
private ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper;
+
+ /**
+ * Wrapper object holding the table options
+ */
+ private TableOptionWrapper tableOptionWrapper;
/**
* task id, each spark task has a unique id
*/
@@ -644,6 +655,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
taskNo = "";
columnSchemaDetails = "";
columnsDataTypeString="";
+ tableOption = "";
}
// helper method to allocate the arrays
@@ -709,6 +721,8 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
retval.append(" ").append(XMLHandler.addTagValue("taskNo", taskNo));
retval.append(" ")
.append(XMLHandler.addTagValue("columnSchemaDetails", columnSchemaDetails));
+ retval.append(" ")
+ .append(XMLHandler.addTagValue("tableOption", tableOption));
return retval.toString();
}
@@ -758,6 +772,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
segmentId = XMLHandler.getTagValue(stepnode, "segmentId");
taskNo = XMLHandler.getTagValue(stepnode, "taskNo");
columnSchemaDetails = XMLHandler.getTagValue(stepnode, "columnSchemaDetails");
+ tableOption = XMLHandler.getTagValue(stepnode, "tableOption");
String batchConfig = XMLHandler.getTagValue(stepnode, "batchSize");
if (batchConfig != null) {
@@ -784,7 +799,9 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
}
public void initialize() throws KettleException {
- columnSchemaDetailsWrapper = new ColumnSchemaDetailsWrapper(columnSchemaDetails);
+ this.columnSchemaDetailsWrapper = new ColumnSchemaDetailsWrapper(columnSchemaDetails);
+ this.tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance();
+ tableOptionWrapper.populateTableOptions(tableOption);
updateDimensions(carbondim, carbonmsr, noDictionaryDims);
dimColDataTypes=RemoveDictionaryUtil.extractDimColsDataTypeValues(columnsDataTypeString);
@@ -1324,6 +1341,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
segmentId = rep.getStepAttributeString(idStep, "segmentId");
taskNo = rep.getStepAttributeString(idStep, "taskNo");
columnSchemaDetails = rep.getStepAttributeString(idStep, "columnSchemaDetails");
+ tableOption = rep.getStepAttributeString(idStep, "tableOption");
int nrKeys = rep.countNrStepAttributes(idStep, "lookup_keyfield");
allocate(nrKeys);
} catch (Exception e) {
@@ -1378,6 +1396,7 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
rep.saveStepAttribute(idTransformation, idStep, "segmentId", segmentId);
rep.saveStepAttribute(idTransformation, idStep, "taskNo", taskNo);
rep.saveStepAttribute(idTransformation, idStep, "columnSchemaDetails", columnSchemaDetails);
+ rep.saveStepAttribute(idTransformation, idStep, "tableOption", tableOption);
} catch (Exception e) {
throw new KettleException(
BaseMessages.getString(pkg, "CarbonStep.Exception.UnableToSaveStepInfoToRepository")
@@ -1678,5 +1697,21 @@ public class CarbonCSVBasedSeqGenMeta extends BaseStepMeta implements StepMetaIn
public ColumnSchemaDetailsWrapper getColumnSchemaDetailsWrapper() {
return columnSchemaDetailsWrapper;
}
+
+ /**
+ * the method set the TableOption details
+ * @param tableOption
+ */
+ public void setTableOption(String tableOption) {
+ this.tableOption = tableOption;
+ }
+
+ /**
+ * the method returns the wrapper object of tableoption
+ * @return
+ */
+ public TableOptionWrapper getTableOptionWrapper() {
+ return tableOptionWrapper;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a45dc4f7/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index c7b2024..237d793 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -163,10 +163,6 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
*/
private boolean[] dimPresentCsvOrder;
/**
- * ValueToCheckAgainst
- */
- private String valueToCheckAgainst;
- /**
* propMap
*/
private Map<String, int[]> propMap;
@@ -436,6 +432,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
if (null != getInputRowMeta()) {
generateNoDictionaryAndComplexIndexMapping();
}
+ serializationNullFormat = meta.getTableOptionWrapper().get("serialization_null_format");
}
// no more input to be expected...
if (r == null) {
@@ -546,6 +543,11 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
return false;
}
+ /**
+ * holds the value to be considered as null while dataload
+ */
+ private String serializationNullFormat;
+
private List<String> getDenormalizedHierarchies() {
List<String> hierList = Arrays.asList(meta.hierNames);
List<String> denormHiers = new ArrayList<String>(10);
@@ -877,7 +879,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
int i = 0;
for (Object obj : rowValue) {
if (obj != null) {
- if (obj.equals(valueToCheckAgainst)) {
+ //removed valueToCheckAgainst does not make sense to
+ // compare non null object with a null string
+ if (obj.toString().equalsIgnoreCase(serializationNullFormat)) {
rowValue[i] = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
}
} else {