You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/31 03:13:10 UTC

[flink] branch release-1.9 updated: [FLINK-14535][table-planner-blink] Fix cast exception when count distinct on decimal fields

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 640b11e  [FLINK-14535][table-planner-blink] Fix cast exception when count distinct on decimal fields
640b11e is described below

commit 640b11ee9454e8d210c0185a3b0001f063ac8649
Author: Zhenghua Gao <do...@gmail.com>
AuthorDate: Wed Oct 23 16:53:10 2019 +0800

    [FLINK-14535][table-planner-blink] Fix cast exception when count distinct on decimal fields
    
    The conversion class of DecimalType of distinct key should be Decimal instead of BigDecimal.
    
    This closes #10001
---
 .../table/planner/plan/utils/AggregateUtil.scala   |  14 +-
 .../runtime/stream/sql/AggregateITCase.scala       | 189 +++++++++------------
 .../runtime/stream/sql/SplitAggregateITCase.scala  | 103 ++++++++++-
 .../table/planner/utils/DateTimeTestUtil.scala     |  18 +-
 4 files changed, 206 insertions(+), 118 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 8877be5..b93daf1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.utils
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{DataTypes, TableConfig, TableException}
-import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.dataformat.{BaseRow, BinaryString, Decimal}
 import org.apache.flink.table.dataview.MapViewTypeInfo
 import org.apache.flink.table.expressions.ExpressionUtils.extractValue
 import org.apache.flink.table.expressions._
@@ -39,14 +39,12 @@ import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfo
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot
 import org.apache.flink.table.types.logical.{LogicalTypeRoot, _}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rex.RexInputRef
@@ -54,7 +52,6 @@ import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.validate.SqlMonotonicity
 import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
 import org.apache.calcite.tools.RelBuilder
-
 import java.time.Duration
 import java.util
 
@@ -503,10 +500,15 @@ object AggregateUtil extends Enumeration {
       case INTERVAL_YEAR_MONTH => DataTypes.INT
       case INTERVAL_DAY_TIME => DataTypes.BIGINT
 
-      case VARCHAR | CHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE)
+      case VARCHAR =>
+        val dt = argTypes(0).asInstanceOf[VarCharType]
+        DataTypes.VARCHAR(dt.getLength).bridgedTo(classOf[BinaryString])
+      case CHAR =>
+        val dt = argTypes(0).asInstanceOf[CharType]
+        DataTypes.CHAR(dt.getLength).bridgedTo(classOf[BinaryString])
       case DECIMAL =>
         val dt = argTypes(0).asInstanceOf[DecimalType]
-        DataTypes.DECIMAL(dt.getPrecision, dt.getScale)
+        DataTypes.DECIMAL(dt.getPrecision, dt.getScale).bridgedTo(classOf[Decimal])
       case t =>
         throw new TableException(s"Distinct aggregate function does not support type: $t.\n" +
           s"Please re-check the data type.")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index aa6f15b..b3f6f16 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -42,6 +42,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import java.lang.{Integer => JInt, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
 
 import scala.collection.{Seq, mutable}
 import scala.util.Random
@@ -181,35 +182,94 @@ class AggregateITCase(
 
   @Test
   def testCountDistinct(): Unit = {
+    val ids = List(
+      1,
+      2, 2,
+      3, 3, 3,
+      4, 4, 4, 4,
+      5, 5, 5, 5, 5)
+
+    val dateTimes = List(
+      "1970-01-01 00:00:01",
+      "1970-01-01 00:00:02", null,
+      "1970-01-01 00:00:04", "1970-01-01 00:00:05", "1970-01-01 00:00:06",
+      "1970-01-01 00:00:07", null, null, "1970-01-01 00:00:10",
+
+      "1970-01-01 00:00:11", "1970-01-01 00:00:11", "1970-01-01 00:00:13",
+      "1970-01-01 00:00:14", "1970-01-01 00:00:15")
+
+    val dates = List(
+      "1970-01-01",
+      "1970-01-02", null,
+      "1970-01-04", "1970-01-05", "1970-01-06",
+      "1970-01-07", null, null, "1970-01-10",
+      "1970-01-11", "1970-01-11", "1970-01-13", "1970-01-14", "1970-01-15")
+
+    val times = List(
+      "00:00:01",
+      "00:00:02", null,
+      "00:00:04", "00:00:05", "00:00:06",
+      "00:00:07", null, null, "00:00:10",
+      "00:00:11", "00:00:11", "00:00:13", "00:00:14", "00:00:15")
+
+    val integers = List(
+      "1",
+      "2", null,
+      "4", "5", "6",
+      "7", null, null, "10",
+      "11", "11", "13", "14", "15")
+
+    val chars = List(
+      "A",
+      "B", null,
+      "D", "E", "F",
+      "H", null, null, "K",
+      "L", "L", "N", "O", "P")
+
     val data = new mutable.MutableList[Row]
-    data.+=(Row.of(JInt.valueOf(1), JLong.valueOf(1L), "A"))
-    data.+=(Row.of(JInt.valueOf(2), JLong.valueOf(2L), "B"))
-    data.+=(Row.of(null, JLong.valueOf(2L), "B"))
-    data.+=(Row.of(JInt.valueOf(3), JLong.valueOf(2L), "B"))
-    data.+=(Row.of(JInt.valueOf(4), JLong.valueOf(3L), "C"))
-    data.+=(Row.of(JInt.valueOf(5), JLong.valueOf(3L), "C"))
-    data.+=(Row.of(JInt.valueOf(5), JLong.valueOf(3L), null))
-    data.+=(Row.of(JInt.valueOf(6), JLong.valueOf(3L), "C"))
-    data.+=(Row.of(JInt.valueOf(7), JLong.valueOf(4L), "B"))
-    data.+=(Row.of(JInt.valueOf(8), JLong.valueOf(4L), "A"))
-    data.+=(Row.of(JInt.valueOf(9), JLong.valueOf(4L), "D"))
-    data.+=(Row.of(null, JLong.valueOf(4L), null))
-    data.+=(Row.of(JInt.valueOf(10), JLong.valueOf(4L), "E"))
-    data.+=(Row.of(JInt.valueOf(11), JLong.valueOf(5L), "A"))
-    data.+=(Row.of(JInt.valueOf(12), JLong.valueOf(5L), "B"))
-
-    val rowType: RowTypeInfo = new RowTypeInfo(Types.INT, Types.LONG, Types.STRING)
-
-    val t = failingDataSource(data)(rowType).toTable(tEnv, 'a, 'b, 'c)
+
+    for (i <- ids.indices) {
+      val v = integers(i)
+      val decimal = if (v == null) null else new JBigDecimal(v)
+      val int = if (v == null) null else JInt.valueOf(v)
+      val long = if (v == null) null else JLong.valueOf(v)
+      data.+=(Row.of(
+        Int.box(ids(i)), localDateTime(dateTimes(i)), localDate(dates(i)),
+        mLocalTime(times(i)), decimal, int, long, chars(i)))
+    }
+
+    val inputs = util.Random.shuffle(data)
+
+    val rowType = new RowTypeInfo(
+      Types.INT, Types.LOCAL_DATE_TIME, Types.LOCAL_DATE, Types.LOCAL_TIME,
+      Types.DECIMAL, Types.INT, Types.LONG, Types.STRING)
+
+    val t = failingDataSource(inputs)(rowType).toTable(tEnv, 'id, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
     tEnv.registerTable("T", t)
     val t1 = tEnv.sqlQuery(
-      "SELECT b, count(*), count(distinct c), count(distinct a) FROM T GROUP BY b")
+      s"""
+         |SELECT
+         | id,
+         | count(distinct a),
+         | count(distinct b),
+         | count(distinct c),
+         | count(distinct d),
+         | count(distinct e),
+         | count(distinct f),
+         | count(distinct g)
+         |FROM T GROUP BY id
+       """.stripMargin)
 
     val sink = new TestingRetractSink
     t1.toRetractStream[Row].addSink(sink)
     env.execute()
 
-    val expected = List("1,1,1,1", "2,3,1,2", "3,4,1,3", "4,5,4,4", "5,2,2,2")
+    val expected = List(
+      "1,1,1,1,1,1,1,1",
+      "2,1,1,1,1,1,1,1",
+      "3,3,3,3,3,3,3,3",
+      "4,2,2,2,2,2,2,2",
+      "5,4,4,4,4,4,4,4")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
@@ -1044,93 +1104,6 @@ class AggregateITCase(
   }
 
   @Test
-  def testTimestampDistinct(): Unit = {
-    val data = new mutable.MutableList[Row]
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:01"), Long.box(1L), "A"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:02"), Long.box(2L), "B"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:03"), Long.box(2L), "B"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:04"), Long.box(3L), "C"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:05"), Long.box(3L), "C"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:06"), Long.box(3L), "C"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:07"), Long.box(4L), "B"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:08"), Long.box(4L), "A"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:09"), Long.box(4L), "D"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:10"), Long.box(4L), "E"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:11"), Long.box(5L), "A"))
-    data.+=(Row.of(localDateTime("1970-01-01 00:00:12"), Long.box(5L), "B"))
-
-    val t = failingDataSource(data)(new RowTypeInfo(
-      Types.LOCAL_DATE_TIME, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("1,1,1", "2,1,2", "3,1,3", "4,4,4", "5,2,2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testDateDistinct(): Unit = {
-    val data = new mutable.MutableList[Row]
-    data.+=(Row.of(localDate("1970-01-01"), Long.box(1L), "A"))
-    data.+=(Row.of(localDate("1970-01-02"), Long.box(2L), "B"))
-    data.+=(Row.of(localDate("1970-01-03"), Long.box(2L), "B"))
-    data.+=(Row.of(localDate("1970-01-04"), Long.box(3L), "C"))
-    data.+=(Row.of(localDate("1970-01-05"), Long.box(3L), "C"))
-    data.+=(Row.of(localDate("1970-01-06"), Long.box(3L), "C"))
-    data.+=(Row.of(localDate("1970-01-07"), Long.box(4L), "B"))
-    data.+=(Row.of(localDate("1970-01-08"), Long.box(4L), "A"))
-    data.+=(Row.of(localDate("1970-01-09"), Long.box(4L), "D"))
-    data.+=(Row.of(localDate("1970-01-10"), Long.box(4L), "E"))
-    data.+=(Row.of(localDate("1970-01-11"), Long.box(5L), "A"))
-    data.+=(Row.of(localDate("1970-01-12"), Long.box(5L), "B"))
-
-    val t = failingDataSource(data)(new RowTypeInfo(
-      Types.LOCAL_DATE, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("1,1,1", "2,1,2", "3,1,3", "4,4,4", "5,2,2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testTimeDistinct(): Unit = {
-    val data = new mutable.MutableList[Row]
-    data.+=(Row.of(mLocalTime("00:00:01"), Long.box(1L), "A"))
-    data.+=(Row.of(mLocalTime("00:00:02"), Long.box(2L), "B"))
-    data.+=(Row.of(mLocalTime("00:00:03"), Long.box(2L), "B"))
-    data.+=(Row.of(mLocalTime("00:00:04"), Long.box(3L), "C"))
-    data.+=(Row.of(mLocalTime("00:00:05"), Long.box(3L), "C"))
-    data.+=(Row.of(mLocalTime("00:00:06"), Long.box(3L), "C"))
-    data.+=(Row.of(mLocalTime("00:00:07"), Long.box(4L), "B"))
-    data.+=(Row.of(mLocalTime("00:00:08"), Long.box(4L), "A"))
-    data.+=(Row.of(mLocalTime("00:00:09"), Long.box(4L), "D"))
-    data.+=(Row.of(mLocalTime("00:00:10"), Long.box(4L), "E"))
-    data.+=(Row.of(mLocalTime("00:00:11"), Long.box(5L), "A"))
-    data.+=(Row.of(mLocalTime("00:00:12"), Long.box(5L), "B"))
-
-    val t = failingDataSource(data)(new RowTypeInfo(
-      Types.LOCAL_TIME, Types.LONG, Types.STRING)).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable("T", t)
-    val t1 = tEnv.sqlQuery("SELECT b, count(distinct c), count(distinct a) FROM T GROUP BY b")
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("1,1,1", "2,1,2", "3,1,3", "4,4,4", "5,2,2")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
   def testCountDistinctWithBinaryRowSource(): Unit = {
     // this case is failed before, because of object reuse problem
     val data = (0 until 100).map {i => ("1", "1", s"${i%50}", "1")}.toList
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index a5f35f5..07200c4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -18,14 +18,17 @@
 
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase.PartialAggMode
 import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.{AggMode, LocalGlobalOff, LocalGlobalOn}
 import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchOn
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.runtime.utils.{StreamingWithAggTestBase, TestingRetractSink}
+import org.apache.flink.table.planner.utils.DateTimeTestUtil.{localDate, localDateTime, localTime => mLocalTime}
 import org.apache.flink.types.Row
 
 import org.junit.Assert.assertEquals
@@ -33,10 +36,13 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Before, Ignore, Test}
 
+import java.lang.{Integer => JInt, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
 import java.util
 
 import scala.collection.JavaConversions._
-import scala.collection.Seq
+import scala.collection.{Seq, mutable}
+import scala.util.Random
 
 @RunWith(classOf[Parameterized])
 class SplitAggregateITCase(
@@ -80,6 +86,101 @@ class SplitAggregateITCase(
   }
 
   @Test
+  def testCountDistinct(): Unit = {
+    val ids = List(
+      1,
+      2, 2,
+      3, 3, 3,
+      4, 4, 4, 4,
+      5, 5, 5, 5, 5)
+
+    val dateTimes = List(
+      "1970-01-01 00:00:01",
+      "1970-01-01 00:00:02", null,
+      "1970-01-01 00:00:04", "1970-01-01 00:00:05", "1970-01-01 00:00:06",
+      "1970-01-01 00:00:07", null, null, "1970-01-01 00:00:10",
+
+      "1970-01-01 00:00:11", "1970-01-01 00:00:11", "1970-01-01 00:00:13",
+      "1970-01-01 00:00:14", "1970-01-01 00:00:15")
+
+    val dates = List(
+      "1970-01-01",
+      "1970-01-02", null,
+      "1970-01-04", "1970-01-05", "1970-01-06",
+      "1970-01-07", null, null, "1970-01-10",
+      "1970-01-11", "1970-01-11", "1970-01-13", "1970-01-14", "1970-01-15")
+
+    val times = List(
+      "00:00:01",
+      "00:00:02", null,
+      "00:00:04", "00:00:05", "00:00:06",
+      "00:00:07", null, null, "00:00:10",
+      "00:00:11", "00:00:11", "00:00:13", "00:00:14", "00:00:15")
+
+    val integers = List(
+      "1",
+      "2", null,
+      "4", "5", "6",
+      "7", null, null, "10",
+      "11", "11", "13", "14", "15")
+
+    val chars = List(
+      "A",
+      "B", null,
+      "D", "E", "F",
+      "H", null, null, "K",
+      "L", "L", "N", "O", "P")
+
+    val data = new mutable.MutableList[Row]
+
+    for (i <- ids.indices) {
+      val v = integers(i)
+      val decimal = if (v == null) null else new JBigDecimal(v)
+      val int = if (v == null) null else JInt.valueOf(v)
+      val long = if (v == null) null else JLong.valueOf(v)
+      data.+=(Row.of(
+        Int.box(ids(i)), localDateTime(dateTimes(i)), localDate(dates(i)),
+        mLocalTime(times(i)), decimal, int, long, chars(i)))
+    }
+
+    val inputs = Random.shuffle(data)
+
+    val rowType = new RowTypeInfo(
+      Types.INT, Types.LOCAL_DATE_TIME, Types.LOCAL_DATE, Types.LOCAL_TIME,
+      Types.DECIMAL, Types.INT, Types.LONG, Types.STRING)
+
+    val t = failingDataSource(inputs)(rowType).toTable(tEnv, 'id, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
+    tEnv.registerTable("MyTable", t)
+    val t1 = tEnv.sqlQuery(
+      s"""
+         |SELECT
+         | id,
+         | count(distinct a),
+         | count(distinct b),
+         | count(distinct c),
+         | count(distinct d),
+         | count(distinct e),
+         | count(distinct f),
+         | count(distinct g)
+         |FROM MyTable
+         |GROUP BY id
+       """.stripMargin)
+
+    val sink = new TestingRetractSink
+    t1.toRetractStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,1,1,1,1,1,1,1",
+      "2,1,1,1,1,1,1,1",
+      "3,3,3,3,3,3,3,3",
+      "4,2,2,2,2,2,2,2",
+      "5,4,4,4,4,4,4,4")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+
+  @Test
   def testSingleDistinctAgg(): Unit = {
     val t1 = tEnv.sqlQuery("SELECT COUNT(DISTINCT c) FROM T")
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
index d80fcea..f4e9578 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
@@ -28,14 +28,26 @@ import java.time.{LocalDate, LocalDateTime, LocalTime}
 object DateTimeTestUtil {
 
   def localDate(s: String): LocalDate = {
-    LocalDateConverter.INSTANCE.toExternal(dateStringToUnixDate(s))
+    if (s == null) {
+      null
+    } else {
+      LocalDateConverter.INSTANCE.toExternal(dateStringToUnixDate(s))
+    }
   }
 
   def localTime(s: String): LocalTime = {
-    LocalTimeConverter.INSTANCE.toExternal(DateTimeUtils.timeStringToUnixDate(s))
+    if (s == null) {
+      null
+    } else {
+      LocalTimeConverter.INSTANCE.toExternal(DateTimeUtils.timeStringToUnixDate(s))
+    }
   }
 
   def localDateTime(s: String): LocalDateTime = {
-    LocalDateTimeConverter.INSTANCE.toExternal(DateTimeUtils.timestampStringToUnixDate(s))
+    if (s == null) {
+      null
+    } else {
+      LocalDateTimeConverter.INSTANCE.toExternal(DateTimeUtils.timestampStringToUnixDate(s))
+    }
   }
 }