You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/09/04 14:38:47 UTC

carbondata git commit: [CARBONDATA-1305] Add limit for external dictionary values

Repository: carbondata
Updated Branches:
  refs/heads/master ee5f65f7d -> 2e04c357c


[CARBONDATA-1305] Add limit for external dictionary values

Analysis: During dictionary creation the dictionary values are kept in a HashSet.

 When the size of hashset reaches more than 500000000 this exception is thrown.

Solution: Limit the dictionary values to 10000000

This closes #1166


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2e04c357
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2e04c357
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2e04c357

Branch: refs/heads/master
Commit: 2e04c357c72a09841a17427462f8967d2adb72c5
Parents: ee5f65f
Author: kunal642 <ku...@gmail.com>
Authored: Wed Jul 12 20:47:15 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Sep 4 20:08:19 2017 +0530

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  5 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 20 ++++--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  8 +--
 .../spark/util/GlobalDictionaryUtil.scala       |  7 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  5 +-
 .../execution/command/carbonTableSchema.scala   |  6 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  6 +-
 .../execution/command/carbonTableSchema.scala   |  6 +-
 .../processing/newflow/DataLoadExecutor.java    |  8 ++-
 .../newflow/exception/NoRetryException.java     | 68 ++++++++++++++++++++
 .../steps/DataWriterBatchProcessorStepImpl.java |  4 ++
 11 files changed, 123 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index ed481bb..ac278d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -85,4 +85,9 @@ public final class CarbonLoadOptionConstants {
   public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS =
       "carbon.options.global.sort.partitions";
 
+  /**
+   *  Max number of dictionary values that can be given with external dictionary
+   */
+  public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 10000000;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index c7ed1c7..ca607e1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.util.control.Breaks.{break, breakable}
+import scala.util.control.Breaks.breakable
 
 import au.com.bytecode.opencsv.CSVReader
 import com.univocity.parsers.common.TextParsingException
@@ -34,8 +34,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
@@ -44,6 +45,7 @@ import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
@@ -83,7 +85,12 @@ case class PrimitiveParser(dimension: CarbonDimension,
 
   def parseString(input: String): Unit = {
     if (hasDictEncoding && input != null) {
-      set.add(input)
+      if (set.size < CarbonLoadOptionConstants.MAX_EXTERNAL_DICTIONARY_SIZE) {
+        set.add(input)
+      } else {
+        throw new NoRetryException(s"Cannot provide more than ${
+          CarbonLoadOptionConstants.MAX_EXTERNAL_DICTIONARY_SIZE } dictionary values")
+      }
     }
   }
 }
@@ -336,7 +343,7 @@ class CarbonGlobalDictionaryGenerateRDD(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
-    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       var dictionaryForDistinctValueLookUp: Dictionary = _
       var dictionaryForSortIndexWriting: Dictionary = _
@@ -444,6 +451,9 @@ class CarbonGlobalDictionaryGenerateRDD(
                     s"\n sort list, distinct and write: $dictWriteTime" +
                     s"\n write sort info: $sortIndexWriteTime")
       } catch {
+        case dictionaryException: NoRetryException =>
+          LOGGER.error(dictionaryException)
+          status = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
         case ex: Exception =>
           LOGGER.error(ex)
           throw ex

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 37b2d02..7c6274b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -48,7 +48,7 @@ import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.load.{CarbonLoaderUtil, FailureCauses}
 import org.apache.carbondata.spark.splits.TableSplit
@@ -255,7 +255,7 @@ class NewCarbonDataLoadRDD[K, V](
           loader.storeLocation,
           recordReaders)
       } catch {
-        case e: BadRecordFoundException =>
+        case e: NoRetryException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage
@@ -441,7 +441,7 @@ class NewDataFrameLoaderRDD[K, V](
           CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
-        case e: BadRecordFoundException =>
+        case e: NoRetryException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage
@@ -626,7 +626,7 @@ class PartitionTableDataLoaderRDD[K, V](
           CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
         executor.execute(model, loader.storeLocation, recordReaders)
       } catch {
-        case e: BadRecordFoundException =>
+        case e: NoRetryException =>
           loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index e7f1c6f..47eaece 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.spark.util
 
-import java.io.{FileNotFoundException, IOException}
 import java.nio.charset.Charset
 import java.util.regex.Pattern
 
@@ -57,6 +56,7 @@ import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.csvload.StringArrayWritable
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd._
@@ -745,6 +745,11 @@ object GlobalDictionaryUtil {
       }
     } catch {
       case ex: Exception =>
+        if (ex.getCause != null && ex.getCause.isInstanceOf[NoRetryException]) {
+          LOGGER.error(ex.getCause, "generate global dictionary failed")
+          throw new Exception("generate global dictionary failed, " +
+                              ex.getCause.getMessage)
+        }
         ex match {
           case spx: SparkException =>
             LOGGER.error(spx, "generate global dictionary failed")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 944c320..5725717 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -57,9 +57,8 @@ import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, S
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException, NoRetryException}
 import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark._
@@ -624,7 +623,7 @@ object CarbonDataRDDFactory {
                 carbonLoadModel,
                 loadMetadataDetails)
             } catch {
-              case e: BadRecordFoundException =>
+              case e: NoRetryException =>
                 loadMetadataDetails
                   .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
                 executionErrors.failureCauses = FailureCauses.BAD_RECORDS

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index c07188a..d8ede13 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
 import org.apache.spark.sql.hive.CarbonMetastore
 import org.apache.spark.sql.types.TimestampType
-import org.apache.spark.util.FileUtils
+import org.apache.spark.util.{CausedBy, FileUtils}
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
@@ -54,6 +54,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
@@ -675,6 +676,9 @@ case class LoadTable(
             updateModel)
         }
       } catch {
+        case CausedBy(ex: NoRetryException) =>
+          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         case ex: Exception =>
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 185066e..94d7b15 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -58,8 +57,7 @@ import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, S
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.exception.{CarbonDataLoadingException, NoRetryException}
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
@@ -723,7 +721,7 @@ object CarbonDataRDDFactory {
                 carbonLoadModel,
                 loadMetadataDetails)
             } catch {
-              case e: BadRecordFoundException =>
+              case e: NoRetryException =>
                 loadMetadataDetails
                   .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
                 executionErrors.failureCauses = FailureCauses.BAD_RECORDS

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 5ae4702..d610780 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
-import org.apache.spark.util.{AlterTableUtil, FileUtils, PartitionUtils}
+import org.apache.spark.util.{AlterTableUtil, CausedBy, FileUtils, PartitionUtils}
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
@@ -59,6 +59,7 @@ import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.newflow.exception.NoRetryException
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
@@ -899,6 +900,9 @@ case class LoadTable(
             updateModel)
         }
       } catch {
+        case CausedBy(ex: NoRetryException) =>
+          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         case ex: Exception =>
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index 20e4d97..36a89b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -22,7 +22,9 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.exception.NoRetryException;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
 
 /**
@@ -55,7 +57,11 @@ public class DataLoadExecutor {
         LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
       }
     } catch (CarbonDataLoadingException e) {
-      throw e;
+      if (e instanceof BadRecordFoundException) {
+        throw new NoRetryException(e.getMessage());
+      } else {
+        throw e;
+      }
     } catch (Exception e) {
       LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
       throw new CarbonDataLoadingException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
new file mode 100644
index 0000000..027b2d0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.exception;
+
+public class NoRetryException extends RuntimeException {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public NoRetryException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public NoRetryException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public NoRetryException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e04c357/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
index fc4d4d2..b13cd26 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -97,6 +98,9 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
       }
     } catch (Exception e) {
       LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterBatchProcessorStepImpl");
+      if (e.getCause() instanceof BadRecordFoundException) {
+        throw new BadRecordFoundException(e.getCause().getMessage());
+      }
       throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
     }
     return null;