You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/08/21 12:52:40 UTC

[carbondata] branch master updated: [CARBONDATA-3928] Handled the Strings which length is greater than 32000 as a bad record.

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 85264e7  [CARBONDATA-3928] Handled the Strings which length is greater than 32000 as a bad record.
85264e7 is described below

commit 85264e751fbd4be80446df006466efe3c3248537
Author: Nihal ojha <ni...@gmail.com>
AuthorDate: Mon Jul 27 17:10:04 2020 +0530

    [CARBONDATA-3928] Handled the Strings which length is greater than 32000 as a bad record.
    
    Why is this PR needed?
    Currently, when the string length exceeds 32000 then the load is failed. The particular
    record should be handled as a bad record and load should not be failed.
    
    What changes were proposed in this PR?
    At place of throwing exception in case of strings greater than 32000 bytes now we will
    handle as a bad record and load will not fail.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3865
---
 .../core/constants/CarbonCommonConstants.java      |   4 +
 .../carbondata/spark/util/CarbonScalaUtil.scala    |   8 +-
 .../spark/src/test/resources/MoreThan32KChar.csv   |   3 +
 .../testsuite/dataload/TestLoadDataGeneral.scala   | 156 ++++++++++++++++++---
 .../badrecordloger/BadRecordLoggerTest.scala       |  50 +------
 .../longstring/VarcharDataTypesBasicTestCase.scala |   9 +-
 .../carbondata/spark/util/BadRecordUtil.scala      |  71 ++++++++++
 .../processing/datatypes/PrimitiveDataType.java    |  19 ++-
 .../impl/NonDictionaryFieldConverterImpl.java      |  12 +-
 .../loading/converter/impl/RowConverterImpl.java   |  14 +-
 .../streaming/parser/FieldConverter.scala          |  10 +-
 11 files changed, 259 insertions(+), 97 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2925e76..8864963 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2468,4 +2468,8 @@ public final class CarbonCommonConstants {
    * index server temp folder aging period default value 3hours.
    */
   public static final String CARBON_INDEXSERVER_TEMPFOLDER_DELETETIME_DEFAULT = "10800000";
+
+  public static final String STRING_LENGTH_EXCEEDED_MESSAGE =
+      "Record %s of column %s exceeded " + MAX_CHARS_PER_COLUMN_DEFAULT +
+          " characters. Please consider long string data type.";
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 9ad76a7..2806376 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -75,10 +75,10 @@ object CarbonScalaUtil {
         carbonLoadModel.getBinaryDecoder)
     } catch {
       case e: Exception =>
-        if (e.getMessage.startsWith(FieldConverter.stringLengthExceedErrorMsg)) {
-          val msg = s"Column ${carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-            .getCreateOrderColumn.get(idx).getColName} is too long," +
-            s" consider to use 'long_string_columns' table property."
+        if (e.getMessage.startsWith(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)) {
+          val msg = CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE.format(row,
+              carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn
+                .get(idx).getColName)
           LOGGER.error(msg, e)
           throw new Exception(msg, e)
         } else {
diff --git a/integration/spark/src/test/resources/MoreThan32KChar.csv b/integration/spark/src/test/resources/MoreThan32KChar.csv
new file mode 100644
index 0000000..ba18b7c
--- /dev/null
+++ b/integration/spark/src/test/resources/MoreThan32KChar.csv
@@ -0,0 +1,3 @@
+ok,hi,1
+itsok,hello,2
+32123,hellohowareyouwelcomehellohellohellohellohellohellohellohelloheellooabcdefghijklmnopqrstuvwxyzabcqwedgsfgafghkfdkhafDGSSADsdjhsdfrtuyioplkjhgfdsazxcvbnmpoiuytrewqasdfghjklmnbvcxzasdghskhdgkhdbkshkjchskdhfssudkdjdudusdjhdshdshsjddshjdkdhgdhdshdhdududushdudududududududududududududududududuudududududududuudududududududududududududududududududududududududududuhellohowareyouwelcomehellohellohellohellohellohellohellohelloheellooabcdefghijklmnopqrstuvwxyzabcqwertuyioplkjhgfdsazxcvbnmpoiuy [...]
\ No newline at end of file
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e772590..1e81554 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -19,8 +19,6 @@ package org.apache.carbondata.integration.spark.testsuite.dataload
 
 import java.math.BigDecimal
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
@@ -31,9 +29,17 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.BadRecordUtil
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.RandomStringUtils
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
 
+  val badRecordAction = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
+  val testdata =s"$resourcesPath/MoreThan32KChar.csv"
+  val longChar: String = RandomStringUtils.randomAlphabetic(33000)
+
   override def beforeEach {
     sql("DROP TABLE IF EXISTS loadtest")
     sql(
@@ -41,6 +47,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
         | CREATE TABLE loadtest(id int, name string, city string, age int)
         | STORED AS carbondata
       """.stripMargin)
+    sql("drop table if exists longerThan32kChar")
   }
 
   private def checkSegmentExists(
@@ -145,47 +152,153 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     sql("drop table if exists carbon_table")
   }
 
-  test("test insert / update with data more than 32000 characters") {
+  private def createTableAndLoadData (badRecordAction: String): Unit = {
+    BadRecordUtil.cleanBadRecordPath("default", "longerthan32kchar")
+    sql("CREATE TABLE longerthan32kchar(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
+    sql(s"LOAD DATA LOCAL INPATH '$testdata' into table longerThan32kChar OPTIONS('FILEHEADER'='dim1,dim2,mes1', " +
+      s"'BAD_RECORDS_ACTION'='${badRecordAction}','BAD_RECORDS_LOGGER_ENABLE'='TRUE')")
+  }
+
+  test("test load / insert / update with data more than 32000 characters and bad record action as Redirect") {
+    createTableAndLoadData("REDIRECT")
+    var redirectCsvPath = BadRecordUtil
+      .getRedirectCsvPath("default", "longerthan32kchar", "0", "0")
+    assert(BadRecordUtil.checkRedirectedCsvContentAvailableInSource(testdata, redirectCsvPath))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "REDIRECT");
+    sql(s"insert into longerthan32kchar values('33000', '$longChar', 4)")
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("ok", "hi", 1), Row("itsok", "hello", 2)))
+    redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "longerthan32kchar", "1", "0")
+    var redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
+    var iterator = redirectedFileLineList.iterator()
+    while (iterator.hasNext) {
+      assert(iterator.next().equals("33000,"+longChar+",4"))
+    }
+
+    // Update strings of length greater than 32000
+    sql(s"update longerthan32kchar set(longerthan32kchar.dim2)=('$longChar') " +
+      "where longerthan32kchar.mes1=1").show()
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("itsok", "hello", 2)))
+    redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "longerthan32kchar", "0", "1")
+    redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
+    iterator = redirectedFileLineList.iterator()
+    while (iterator.hasNext) {
+      assert(iterator.next().equals("ok,"+longChar+",1"))
+    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
+
+    // Insert longer string without converter step will throw exception
+    intercept[Exception] {
+      sql(s"insert into longerthan32kchar values('32000', '$longChar', 3)")
+    }
+    BadRecordUtil.cleanBadRecordPath("default", "longerthan32kchar")
+  }
+
+  test("test load / insert / update with data more than 32000 characters and bad record action as Force") {
+    createTableAndLoadData("FORCE")
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("ok", "hi", 1), Row("itsok", "hello", 2), Row("32123", null, 3)))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
-    val testdata =s"$resourcesPath/32000char.csv"
-    sql("drop table if exists load32000chardata")
-    sql("drop table if exists load32000chardata_dup")
-    sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
-    sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
-    sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE");
+    sql(s"insert into longerthan32kchar values('33000', '$longChar', 4)")
+    checkAnswer(sql("select * from longerthan32kchar"),
+      Seq(Row("ok", "hi", 1), Row("itsok", "hello", 2), Row("32123", null, 3), Row("33000", null, 4)))
+
+    // Update strings of length greater than 32000
+    sql(s"update longerthan32kchar set(longerthan32kchar.dim2)=('$longChar') " +
+      "where longerthan32kchar.mes1=1").show()
+    checkAnswer(sql("select * from longerthan32kchar"),
+      Seq(Row("ok", null, 1), Row("itsok", "hello", 2), Row("32123", null, 3), Row("33000", null, 4)))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
+
+    // Insert longer string without converter step will throw exception
     intercept[Exception] {
-      sql("insert into load32000chardata_dup select dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
+      sql(s"insert into longerthan32kchar values('32000', '$longChar', 3)")
+    }
+  }
+
+  test("test load / insert / update with data more than 32000 characters and bad record action as Fail") {
+    sql("CREATE TABLE longerthan32kchar(dim1 String, dim2 String, mes1 int) STORED AS carbondata")
+    var exception = intercept[Exception] {
+      sql(s"LOAD DATA LOCAL INPATH '$testdata' into table longerThan32kChar OPTIONS('FILEHEADER'='dim1,dim2,mes1', " +
+        s"'BAD_RECORDS_ACTION'='FAIL','BAD_RECORDS_LOGGER_ENABLE'='TRUE')")
+    }
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+    exception = intercept[Exception] {
+      sql(s"insert into longerthan32kchar values('33000', '$longChar', 4)")
     }
-    sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata_dup OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    assert(exception.getMessage.contains(s"Record [33000, $longChar, 4] of column dim2 exceeded " +
+      s"${CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT} characters. Please consider long string data type."))
+    // Update strings of length greater than 32000
+    sql(s"insert into longerthan32kchar values('ok', 'hi', 1)")
+    exception = intercept[Exception] {
+      sql(s"update longerthan32kchar set(longerthan32kchar.dim2)=('$longChar') " +
+        "where longerthan32kchar.mes1=1").show()
+    }
+    assert(exception.getMessage.contains(s"Record [ok, $longChar, 1] of column dim2 exceeded " +
+      s"${CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT} characters. Please consider long string data type."))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
+
+    // Insert longer string without converter step will throw exception
     intercept[Exception] {
-      sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select concat(load32000chardata.dim2,'aaaa') from load32000chardata)").show()
+      sql(s"insert into longerthan32kchar values('32000', '$longChar', 3)")
     }
+  }
+
+  test("test load / insert / update with data more than 32000 characters and bad record action as Ignore") {
+    createTableAndLoadData("IGNORE")
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("ok", "hi", 1), Row("itsok", "hello", 2)))
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "IGNORE");
+    sql(s"insert into longerthan32kchar values('33000', '$longChar', 4)")
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("ok", "hi", 1), Row("itsok", "hello", 2)))
+
+    // Update strings of length greater than 32000
+    sql(s"update longerthan32kchar set(longerthan32kchar.dim2)=('$longChar') " +
+      "where longerthan32kchar.mes1=1").show()
+    checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("itsok", "hello", 2)))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
+
+    // Insert longer string without converter step will throw exception
+    intercept[Exception] {
+      sql(s"insert into longerthan32kchar values('32000', '$longChar', 3)")
+    }
   }
 
-  test("test load / insert / update with data more than 32000 bytes - dictionary_exclude") {
+  test("test load / insert with data more than 32000 bytes - dictionary_exclude") {
     val testdata = s"$resourcesPath/unicodechar.csv"
     sql("drop table if exists load32000bytes")
     sql("create table load32000bytes(name string) STORED AS carbondata")
     sql("insert into table load32000bytes select 'aaa'")
+    checkAnswer(sql("select count(*) from load32000bytes"), Seq(Row(1)))
 
-    assert(intercept[Exception] {
-      sql(s"load data local inpath '$testdata' into table load32000bytes OPTIONS ('FILEHEADER'='name')")
-    }.getMessage.contains("DataLoad failure: Dataload failed, String size cannot exceed 32000 bytes"))
+    // Below load will be inserted as null because Strings greater than 32000 is bad record.
+    sql(s"load data local inpath '$testdata' into table load32000bytes OPTIONS ('FILEHEADER'='name')")
+    checkAnswer(sql("select count(*) from load32000bytes"), Seq(Row(2)))
+    checkAnswer(sql("select * from load32000bytes"), Seq(Row("aaa"), Row(null)))
 
     val source = scala.io.Source.fromFile(testdata, CarbonCommonConstants.DEFAULT_CHARSET)
     val data = source.mkString
 
+    // Insert will throw exception as it is without converter step.
     intercept[Exception] {
       sql(s"insert into load32000bytes values('$data')")
     }
 
-    intercept[Exception] {
-      sql(s"update load32000bytes set(name)= ('$data')").show()
-    }
-
     sql("drop table if exists load32000bytes")
   }
 
@@ -237,11 +350,14 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")
+    sql("drop table if exists longerThan32kChar")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
         CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
       .addProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
       .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE,
         CarbonCommonConstants.DATA_LOAD_BATCH_SIZE_DEFAULT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, badRecordAction);
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 9ccfd84..94af7c3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -17,18 +17,14 @@
 
 package org.apache.carbondata.spark.testsuite.badrecordloger
 
-import java.io.{File, FileFilter}
-
-import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants}
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.BadRecordUtil
 import org.apache.spark.sql.test.util.QueryTest
 
-import org.apache.carbondata.core.datastore.impl.FileFactory
-
 /**
  * Test Class for detailed query on timestamp datatypes
  *
@@ -247,7 +243,7 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
   }
 
   test("validate redirected data") {
-    cleanBadRecordPath("default", "sales_test")
+    BadRecordUtil.cleanBadRecordPath("default", "sales_test")
     val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
     sql(
       """CREATE TABLE IF NOT EXISTS sales_test(ID BigInt, date long, country int,
@@ -264,8 +260,8 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
         assert(true)
       }
     }
-    val redirectCsvPath = getRedirectCsvPath("default", "sales_test", "0", "0")
-    assert(checkRedirectedCsvContentAvailableInSource(csvFilePath, redirectCsvPath))
+    val redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "sales_test", "0", "0")
+    assert(BadRecordUtil.checkRedirectedCsvContentAvailableInSource(csvFilePath, redirectCsvPath))
   }
 
   test("test load ddl command with improper value") {
@@ -303,44 +299,6 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  def getRedirectCsvPath(dbName: String, tableName: String, segment: String, task: String) = {
-    var badRecordLocation = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
-    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment + "/" +
-                        task
-    val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
-      override def accept(pathname: File): Boolean = {
-        pathname.getPath.endsWith(".csv")
-      }
-    })
-    listFiles(0)
-  }
-
-  /**
-   *
-   * @param csvFilePath
-   * @param redirectCsvPath
-   */
-  def checkRedirectedCsvContentAvailableInSource(csvFilePath: String,
-      redirectCsvPath: File): Boolean = {
-    val origFileLineList = FileUtils.readLines(new File(csvFilePath))
-    val redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
-    val iterator = redirectedFileLineList.iterator()
-    while (iterator.hasNext) {
-      if (!origFileLineList.contains(iterator.next())) {
-        return false;
-      }
-    }
-    return true
-  }
-
-  def cleanBadRecordPath(dbName: String, tableName: String) = {
-    var badRecordLocation = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
-    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
-    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
-  }
-
   override def afterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 82e04c0..706095a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -194,11 +194,10 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
     // query should pass
     checkAnswer(sql("select * from testlongstring"),
       Seq(Row(1, "ab", "cool"), Row(1, "ab1", longChar), Row(1, "abc", longChar)))
-    // insert long string should fail as unset is done
-    val e = intercept[Exception] {
-      sql(s""" insert into testlongstring select 1, 'abc', '$longChar'""")
-    }
-    assert(e.getMessage.contains("DataLoad failure: Column description is too long"))
+    // insert long string will be handled as bad record as unset is done
+    sql(s""" insert into testlongstring select 1, 'abc', '$longChar'""")
+    checkAnswer(sql("select * from testlongstring"),
+      Seq(Row(1, "ab", "cool"), Row(1, "ab1", longChar), Row(1, "abc", longChar), Row(1, "abc", null)))
     sql("ALTER TABLE testlongstring SET TBLPROPERTIES('long_String_columns'='description')")
     sql(s""" insert into testlongstring select 1, 'ab1', '$longChar'""")
     sql("drop table if exists testlongstring")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
new file mode 100644
index 0000000..cc7b152
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/BadRecordUtil.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import java.io.{File, FileFilter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.commons.io.FileUtils
+
+object BadRecordUtil {
+
+  /**
+   * get the bad record redirected csv file path
+   */
+  def getRedirectCsvPath(dbName: String,
+    tableName: String, segment: String, task: String): File = {
+    var badRecordLocation = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName + "/" + segment + "/" +
+      task
+    val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
+      override def accept(pathname: File): Boolean = {
+        pathname.getPath.endsWith(".csv")
+      }
+    })
+    listFiles(0)
+  }
+
+  /**
+   * compare data of csvfile and redirected csv file.
+   */
+  def checkRedirectedCsvContentAvailableInSource(csvFilePath: String,
+    redirectCsvPath: File): Boolean = {
+    val origFileLineList = FileUtils.readLines(new File(csvFilePath))
+    val redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
+    val iterator = redirectedFileLineList.iterator()
+    while (iterator.hasNext) {
+      if (!origFileLineList.contains(iterator.next())) {
+        return false;
+      }
+    }
+    true
+  }
+
+  /**
+   * delete the files at bad record location
+   */
+  def  cleanBadRecordPath(dbName: String, tableName: String): Boolean = {
+    var badRecordLocation = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+    badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index f1b6f1c..5bfc61e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
@@ -301,8 +300,10 @@ public class PrimitiveDataType implements GenericDataType<Object> {
           }
           if (this.carbonDimension.getDataType() == DataTypes.STRING
               && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,
+                input.toString(), this.carbonDimension.getColName()));
+            updateNullValue(dataOutputStream, logHolder);
+            return;
           }
         }
         updateValueToByteStream(dataOutputStream, value);
@@ -336,15 +337,19 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     if (isWithoutConverter) {
       if (this.carbonDimension.getDataType() == DataTypes.STRING && input instanceof String
           && ((String)input).length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-        throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-            + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,
+            input.toString(), this.carbonDimension.getColName()));
+        updateNullValue(dataOutputStream, logHolder);
+        return;
       }
       updateValueToByteStream(dataOutputStream, value);
     } else {
       if (this.carbonDimension.getDataType() == DataTypes.STRING
           && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-        throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-            + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,
+            input.toString(), this.carbonDimension.getColName()));
+        updateNullValue(dataOutputStream, logHolder);
+        return;
       }
       if (parsedValue.length() > 0) {
         updateValueToByteStream(dataOutputStream,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 72a2220..c187a3d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -82,10 +82,8 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && parsedValue.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException(String.format(
-                "Dataload failed, String size cannot exceed %d bytes,"
-                    + " please consider long string data type",
-                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
+            logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE);
+            return getNullValue();
           }
           return parsedValue;
         } else {
@@ -93,10 +91,8 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING && parsedValue.toString().length()
               > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException(String.format(
-                "Dataload failed, String size cannot exceed %d bytes,"
-                    + " please consider long string data type",
-                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
+            logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE);
+            return getNullValue();
           }
           return parsedValue;
         }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index c89b932..5d2226a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.loading.converter.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -118,6 +119,10 @@ public class RowConverterImpl implements RowConverter {
             .getTableProperties();
     String spatialProperty = properties.get(CarbonCommonConstants.SPATIAL_INDEX);
     boolean isSpatialColumn = false;
+    Object[] rawData = row.getRawData();
+    if (rawData == null) {
+      rawData = row.getData() == null ? null : row.getData().clone();
+    }
     for (int i = 0; i < fieldConverters.length; i++) {
       if (spatialProperty != null) {
         isSpatialColumn = fieldConverters[i].getDataField().getColumn().getColName()
@@ -130,9 +135,14 @@ public class RowConverterImpl implements RowConverter {
       }
       fieldConverters[i].convert(row, logHolder);
       if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
-        badRecordLogger.addBadRecordsToBuilder(row.getRawData(), logHolder.getReason());
+        String reason = logHolder.getReason();
+        if (reason.equalsIgnoreCase(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)) {
+          reason = String.format(reason, Arrays.toString(rawData),
+              this.fields[i].getColumn().getColName());
+        }
+        badRecordLogger.addBadRecordsToBuilder(rawData, reason);
         if (badRecordLogger.isDataLoadFail()) {
-          String error = "Data load failed due to bad record: " + logHolder.getReason();
+          String error = "Data load failed due to bad record: " + reason;
           if (!badRecordLogger.isBadRecordLoggerEnable()) {
             error += "Please enable bad record logger to know the detail reason.";
           }
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index ef3853c..5977fe7 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -23,9 +23,9 @@ import java.util
 import java.util.Base64
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 object FieldConverter {
-  val stringLengthExceedErrorMsg = "Data load failed, String length cannot exceed "
 
   /**
    * Return a String representation of the input value
@@ -54,11 +54,11 @@ object FieldConverter {
       value match {
         case s: String => if (!isVarcharType && !isComplexType &&
                               s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-          throw new IllegalArgumentException(stringLengthExceedErrorMsg +
-            CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters")
-        } else {
-          s
+          if (!CarbonProperties.isBadRecordHandlingEnabledForInsert()) {
+            throw new IllegalArgumentException(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)
+          }
         }
+        s
         case d: java.math.BigDecimal => d.toPlainString
         case i: java.lang.Integer => i.toString
         case d: java.lang.Double => d.toString