You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/09/03 08:19:29 UTC
[1/2] incubator-carbondata git commit: handled all dictionary
exception
Repository: incubator-carbondata
Updated Branches:
refs/heads/master cda2a4d6b -> ec58755f5
handled all dictionary exception
add testcase
remove rdd.count
use better sentence
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/40595a90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/40595a90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/40595a90
Branch: refs/heads/master
Commit: 40595a90f2ebd1dfca001dd31f450c31ff096065
Parents: cda2a4d
Author: foryou2030 <fo...@126.com>
Authored: Fri Sep 2 17:18:37 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Sat Sep 3 16:18:11 2016 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 --
.../spark/util/GlobalDictionaryUtil.scala | 73 +++++++++++++++-----
integration/spark/src/test/resources/dict.txt | 1 +
.../dataload/TestLoadDataUseAllDictionary.scala | 56 +++++++++++++++
4 files changed, 111 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/40595a90/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6050719..41d6ebf 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -199,10 +199,6 @@ public final class CarbonCommonConstants {
*/
public static final String MEMBER_DEFAULT_VAL = "@NU#LL$!";
/**
- * DEFAULT_COLUMN_NAME
- */
- public static final String DEFAULT_COLUMN_NAME = "@NU#LL$!COLUMN";
- /**
* FILE STATUS IN-PROGRESS
*/
public static final String FILE_INPROGRESS_STATUS = ".inprogress";
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/40595a90/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 4a2cd84..d68fa86 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -27,7 +27,7 @@ import scala.language.implicitConversions
import scala.util.control.Breaks.{break, breakable}
import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark.Logging
+import org.apache.spark.{Accumulator, Logging}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonEnv, CarbonRelation, DataFrame, SQLContext}
import org.apache.spark.sql.hive.CarbonMetastoreCatalog
@@ -574,6 +574,48 @@ object GlobalDictionaryUtil extends Logging {
}
/**
+ * parse records in dictionary file and validate record
+ *
+ * @param x
+ * @param accum
+ * @param csvFileColumns
+ */
+ private def parseRecord(x: String, accum: Accumulator[Int],
+ csvFileColumns: Array[String]) : (String, String) = {
+ val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
+ var columnName: String = ""
+ var value: String = ""
+ // such as "," , "", throw ex
+ if (tokens.size == 0) {
+ logError("Read a bad dictionary record: " + x)
+ accum += 1
+ } else if (tokens.size == 1) {
+ // such as "1", "jone", throw ex
+ if (x.contains(",") == false) {
+ accum += 1
+ } else {
+ try {
+ columnName = csvFileColumns(tokens(0).toInt)
+ } catch {
+ case ex: Exception =>
+ logError("Read a bad dictionary record: " + x)
+ accum += 1
+ }
+ }
+ } else {
+ try {
+ columnName = csvFileColumns(tokens(0).toInt)
+ value = tokens(1)
+ } catch {
+ case ex: Exception =>
+ logError("Read a bad dictionary record: " + x)
+ accum += 1
+ }
+ }
+ (columnName, value)
+ }
+
+ /**
* read local dictionary and prune column
*
* @param sqlContext
@@ -585,27 +627,13 @@ object GlobalDictionaryUtil extends Logging {
private def readAllDictionaryFiles(sqlContext: SQLContext,
csvFileColumns: Array[String],
requireColumns: Array[String],
- allDictionaryPath: String) = {
+ allDictionaryPath: String,
+ accumulator: Accumulator[Int]) = {
var allDictionaryRdd: RDD[(String, Iterable[String])] = null
try {
// read local dictionary file, and spilt (columnIndex, columnValue)
val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
- .map(x => {
- val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
- if (tokens.size != 2) {
- logError("Read a bad dictionary record: " + x)
- }
- var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
- var value: String = ""
- try {
- columnName = csvFileColumns(tokens(0).toInt)
- value = tokens(1)
- } catch {
- case ex: Exception =>
- logError("Reset bad dictionary record as default value")
- }
- (columnName, value)
- })
+ .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
// group by column index, and filter required columns
val requireColumnsList = requireColumns.toList
@@ -791,9 +819,11 @@ object GlobalDictionaryUtil extends Logging {
if (requireDimension.nonEmpty) {
val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
hdfsLocation, dictfolderPath, false)
+ // check if dictionary files contains bad record
+ val accumulator = sqlContext.sparkContext.accumulator(0)
// read local dictionary file, and group by key
val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
- requireColumnNames, allDictionaryPath)
+ requireColumnNames, allDictionaryPath, accumulator)
// read exist dictionary and combine
val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -801,6 +831,11 @@ object GlobalDictionaryUtil extends Logging {
val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
// check result status
checkStatus(carbonLoadModel, sqlContext, model, statusList)
+ // if the dictionary contains wrong format record, throw ex
+ if (accumulator.value > 0) {
+ throw new DataLoadingException("Data Loading failure, dictionary values are " +
+ "not in correct format!")
+ }
} else {
logInfo("have no column need to generate global dictionary")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/40595a90/integration/spark/src/test/resources/dict.txt
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/dict.txt b/integration/spark/src/test/resources/dict.txt
new file mode 100644
index 0000000..64ba47f
--- /dev/null
+++ b/integration/spark/src/test/resources/dict.txt
@@ -0,0 +1 @@
+12154
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/40595a90/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
new file mode 100644
index 0000000..143b386
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataUseAllDictionary extends QueryTest with BeforeAndAfterAll{
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS t3")
+ sql("""
+ CREATE TABLE IF NOT EXISTS t3
+ (ID Int, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int)
+ STORED BY 'carbondata'
+ """)
+ }
+
+ test("test load data use all dictionary, and given wrong format dictionary values") {
+ try {
+ sql(s"""
+ LOAD DATA LOCAL INPATH './src/test/resources/windows.csv' into table t3
+ options('FILEHEADER'='id,date,country,name,phonetype,serialname,salary',
+ 'All_DICTIONARY_PATH'='./src/test/resources/dict.txt')
+ """)
+ assert(false)
+ } catch {
+ case e: DataLoadingException =>
+ assert(e.getMessage.equals("Data Loading failure, dictionary values are " +
+ "not in correct format!"))
+ }
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS t3")
+ }
+}
[2/2] incubator-carbondata git commit: [CARBONDATA-202] Handled
exception thrown in beeline for all dictionary This closes #122
Posted by ch...@apache.org.
[CARBONDATA-202] Handled exception thrown in beeline for all dictionary This closes #122
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ec58755f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ec58755f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ec58755f
Branch: refs/heads/master
Commit: ec58755f53c05b3495f17b1dcaace051d8939409
Parents: cda2a4d 40595a9
Author: chenliang613 <ch...@apache.org>
Authored: Sat Sep 3 16:19:02 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Sat Sep 3 16:19:02 2016 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 4 --
.../spark/util/GlobalDictionaryUtil.scala | 73 +++++++++++++++-----
integration/spark/src/test/resources/dict.txt | 1 +
.../dataload/TestLoadDataUseAllDictionary.scala | 56 +++++++++++++++
4 files changed, 111 insertions(+), 23 deletions(-)
----------------------------------------------------------------------