You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/09/22 05:36:38 UTC

[40/50] [abbrv] incubator-carbondata git commit: handled all dictionary exception

handled all dictionary exception

add testcase

remove rdd.count

use better sentence


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/50b3746b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/50b3746b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/50b3746b

Branch: refs/heads/branch-0.1
Commit: 50b3746b3e0a8e74499689d520a0e77e433234ca
Parents: 30548ba
Author: foryou2030 <fo...@126.com>
Authored: Fri Sep 2 17:18:37 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Sep 22 10:33:21 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 --
 .../spark/util/GlobalDictionaryUtil.scala       | 73 +++++++++++++++-----
 integration/spark/src/test/resources/dict.txt   |  1 +
 .../dataload/TestLoadDataUseAllDictionary.scala | 56 +++++++++++++++
 4 files changed, 111 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50b3746b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6050719..41d6ebf 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -199,10 +199,6 @@ public final class CarbonCommonConstants {
    */
   public static final String MEMBER_DEFAULT_VAL = "@NU#LL$!";
   /**
-   * DEFAULT_COLUMN_NAME
-   */
-  public static final String DEFAULT_COLUMN_NAME = "@NU#LL$!COLUMN";
-  /**
    * FILE STATUS IN-PROGRESS
    */
   public static final String FILE_INPROGRESS_STATUS = ".inprogress";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50b3746b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 022a5f8..02b70d0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -27,7 +27,7 @@ import scala.language.implicitConversions
 import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark.Logging
+import org.apache.spark.{Accumulator, Logging}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation, DataFrame, SQLContext}
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
@@ -584,6 +584,48 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   /**
+   * parse records in dictionary file and validate record
+   *
+   * @param x
+   * @param accum
+   * @param csvFileColumns
+   */
+  private def parseRecord(x: String, accum: Accumulator[Int],
+                  csvFileColumns: Array[String]) : (String, String) = {
+    val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
+    var columnName: String = ""
+    var value: String = ""
+    // such as "," , "", throw ex
+    if (tokens.size == 0) {
+      logError("Read a bad dictionary record: " + x)
+      accum += 1
+    } else if (tokens.size == 1) {
+      // such as "1", "jone", throw ex
+      if (x.contains(",") == false) {
+        accum += 1
+      } else {
+        try {
+          columnName = csvFileColumns(tokens(0).toInt)
+        } catch {
+          case ex: Exception =>
+            logError("Read a bad dictionary record: " + x)
+            accum += 1
+        }
+      }
+    } else {
+      try {
+        columnName = csvFileColumns(tokens(0).toInt)
+        value = tokens(1)
+      } catch {
+        case ex: Exception =>
+          logError("Read a bad dictionary record: " + x)
+          accum += 1
+      }
+    }
+    (columnName, value)
+  }
+
+  /**
    * read local dictionary and prune column
    *
    * @param sqlContext
@@ -595,27 +637,13 @@ object GlobalDictionaryUtil extends Logging {
   private def readAllDictionaryFiles(sqlContext: SQLContext,
                                      csvFileColumns: Array[String],
                                      requireColumns: Array[String],
-                                     allDictionaryPath: String) = {
+                                     allDictionaryPath: String,
+                                     accumulator: Accumulator[Int]) = {
     var allDictionaryRdd: RDD[(String, Iterable[String])] = null
     try {
       // read local dictionary file, and spilt (columnIndex, columnValue)
       val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
-        .map(x => {
-        val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
-        if (tokens.size != 2) {
-          logError("Read a bad dictionary record: " + x)
-        }
-        var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
-        var value: String = ""
-        try {
-          columnName = csvFileColumns(tokens(0).toInt)
-          value = tokens(1)
-        } catch {
-          case ex: Exception =>
-            logError("Reset bad dictionary record as default value")
-        }
-        (columnName, value)
-      })
+        .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
 
       // group by column index, and filter required columns
       val requireColumnsList = requireColumns.toList
@@ -801,9 +829,11 @@ object GlobalDictionaryUtil extends Logging {
           if (requireDimension.nonEmpty) {
             val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
               hdfsLocation, dictfolderPath, false)
+            // check if dictionary files contains bad record
+            val accumulator = sqlContext.sparkContext.accumulator(0)
             // read local dictionary file, and group by key
             val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
-              requireColumnNames, allDictionaryPath)
+              requireColumnNames, allDictionaryPath, accumulator)
             // read exist dictionary and combine
             val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
               .partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -811,6 +841,11 @@ object GlobalDictionaryUtil extends Logging {
             val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
             // check result status
             checkStatus(carbonLoadModel, sqlContext, model, statusList)
+            // if the dictionary contains wrong format record, throw ex
+            if (accumulator.value > 0) {
+              throw new DataLoadingException("Data Loading failure, dictionary values are " +
+                "not in correct format!")
+            }
           } else {
             logInfo("have no column need to generate global dictionary")
           }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50b3746b/integration/spark/src/test/resources/dict.txt
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/dict.txt b/integration/spark/src/test/resources/dict.txt
new file mode 100644
index 0000000..64ba47f
--- /dev/null
+++ b/integration/spark/src/test/resources/dict.txt
@@ -0,0 +1 @@
+12154
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50b3746b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
new file mode 100644
index 0000000..143b386
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataUseAllDictionary.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataUseAllDictionary extends QueryTest with BeforeAndAfterAll{
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS t3")
+    sql("""
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED BY 'carbondata'
+           """)
+  }
+
+  test("test load data use all dictionary, and given wrong format dictionary values") {
+    try {
+      sql(s"""
+           LOAD DATA LOCAL INPATH './src/test/resources/windows.csv' into table t3
+           options('FILEHEADER'='id,date,country,name,phonetype,serialname,salary',
+           'All_DICTIONARY_PATH'='./src/test/resources/dict.txt')
+           """)
+      assert(false)
+    } catch {
+      case e: DataLoadingException =>
+        assert(e.getMessage.equals("Data Loading failure, dictionary values are " +
+          "not in correct format!"))
+    }
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS t3")
+  }
+}