You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/29 14:41:34 UTC
[1/2] incubator-carbondata git commit: Parse some Spark exception
from executor side and show them directly on driver
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 8e932cba1 -> e87a40093
Parse some Spark exception from executor side and show them directly on driver
fix review comments
add test case
add test case
fix comments
rebase 827
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/48595085
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/48595085
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/48595085
Branch: refs/heads/master
Commit: 485950853cd5a1c366e213b3d15bc6e97cfac586
Parents: 8e932cb
Author: Zhangshunyu <zh...@huawei.com>
Authored: Mon Aug 22 10:08:26 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Aug 29 22:33:27 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 27 +++++---
.../TestLoadDataWithFileHeaderException.scala | 67 ++++++++++++++++++++
2 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48595085/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 73998c5..f1fc6d4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,7 +29,7 @@ import scala.util.control.Breaks._
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileSplit
-import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv, SparkException}
import org.apache.spark.sql.{CarbonEnv, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.integration.spark.merger.{CompactionCallable, CompactionType}
import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.load._
@@ -724,6 +725,8 @@ object CarbonDataRDDFactory extends Logging {
partitioner.partitionCount, currentLoadCount.toString)
var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
var status: Array[(String, LoadMetadataDetails)] = null
+ var errorMessage: String = "DataLoad failure"
+ var executorMessage: String = ""
try {
status = new
CarbonDataLoadRDD(sqlContext.sparkContext,
@@ -773,12 +776,21 @@ object CarbonDataRDDFactory extends Logging {
} catch {
case ex: Throwable =>
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- logInfo("DataLoad failure")
+ ex match {
+ case sparkException: SparkException =>
+ if (sparkException.getCause.isInstanceOf[DataLoadingException]) {
+ executorMessage = sparkException.getCause.getMessage
+ errorMessage = errorMessage + ": " + executorMessage
+ }
+ case _ =>
+ executorMessage = ex.getCause.getMessage
+ errorMessage = errorMessage + ": " + executorMessage
+ }
+ logInfo(errorMessage)
logger.error(ex)
}
if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- var message: String = ""
logInfo("********starting clean up**********")
if (isAgg) {
// TODO:need to clean aggTable
@@ -786,7 +798,7 @@ object CarbonDataRDDFactory extends Logging {
carbonLoadModel.getTableName, carbonLoadModel.getAggTableName, hdfsStoreLocation,
currentRestructNumber
)
- message = "Aggregate table creation failure"
+ errorMessage = "Aggregate table creation failure"
} else {
CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
val aggTables = carbonTable.getAggregateTablesName
@@ -800,13 +812,12 @@ object CarbonDataRDDFactory extends Logging {
carbonLoadModel.getTableName, hdfsStoreLocation, currentRestructNumber, newSlice)
}
}
- message = "DataLoad failure"
}
logInfo("********clean up done**********")
logger.audit(s"Data load is failed for " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
logWarning("Cannot write load metadata file as data load failed")
- throw new Exception(message)
+ throw new Exception(errorMessage)
} else {
val metadataDetails = status(0)._2
if (!isAgg) {
@@ -818,11 +829,11 @@ object CarbonDataRDDFactory extends Logging {
loadStartTime
)
if (!status) {
- val message = "Dataload failed due to failure in table status updation."
+ val errorMessage = "Dataload failed due to failure in table status updation."
logger.audit("Data load is failed for " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
logger.error("Dataload failed due to failure in table status updation.")
- throw new Exception(message)
+ throw new Exception(errorMessage)
}
} else if (!carbonLoadModel.isRetentionRequest) {
// TODO : Handle it
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48595085/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
new file mode 100644
index 0000000..d6bac33
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterAll{
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS t3")
+ sql("""
+ CREATE TABLE IF NOT EXISTS t3
+ (ID Int, date Timestamp, country String,
+ name String, phonetype String, serialname String, salary Int)
+ STORED BY 'carbondata'
+ """)
+ }
+
+ test("test load data both file and ddl without file header exception") {
+ try {
+ sql(s"""
+ LOAD DATA LOCAL INPATH './src/test/resources/windows.csv' into table t3
+ """)
+ assert(false)
+ } catch {
+ case e: Exception =>
+ assert(e.getMessage.equals("DataLoad failure: CSV File provided is not proper. " +
+ "Column names in schema and csv header are not same. CSVFile Name : windows.csv"))
+ }
+ }
+
+ test("test load data ddl provided wrong file header exception") {
+ try {
+ sql(s"""
+ LOAD DATA LOCAL INPATH './src/test/resources/windows.csv' into table t3
+ options('fileheader'='no_column')
+ """)
+ assert(false)
+ } catch {
+ case e: Exception =>
+ assert(e.getMessage.equals("DataLoad failure: CSV header provided in DDL is not proper. " +
+ "Column names in schema and CSV header are not the same."))
+ }
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS t3")
+ }
+}
[2/2] incubator-carbondata git commit: [CARBONDATA-132] Fix the bug
that the CSV file header exception can not be shown to user using beeline
This closes #81
Posted by ch...@apache.org.
[CARBONDATA-132] Fix the bug that the CSV file header exception can not be shown to user using beeline This closes #81
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e87a4009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e87a4009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e87a4009
Branch: refs/heads/master
Commit: e87a4009393a28f4b46f9da090cc8e713d3cac61
Parents: 8e932cb 4859508
Author: chenliang613 <ch...@apache.org>
Authored: Mon Aug 29 22:41:09 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Aug 29 22:41:09 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 27 +++++---
.../TestLoadDataWithFileHeaderException.scala | 67 ++++++++++++++++++++
2 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------