You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/07/13 15:18:40 UTC

[flink] branch master updated (f81f3a0 -> 66353f2)

This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f81f3a0  [FLINK-18477][examples-table] Fix packaging of ChangelogSocketExample
     new 3865f7b  [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type
     new 66353f2  [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../json/JsonRowDataDeserializationSchema.java     |   6 +-
 .../formats/json/JsonRowDeserializationSchema.java |  10 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 107 ++++++++++++++-------
 .../json/JsonRowDeserializationSchemaTest.java     |  30 ++++++
 .../stream/StreamExecTemporalSortRule.scala        |  15 ++-
 .../table/planner/plan/stream/sql/SortTest.xml     |   6 +-
 .../runtime/stream/sql/TemporalSortITCase.scala    |  43 +++++++++
 7 files changed, 172 insertions(+), 45 deletions(-)


[flink] 02/02: [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66353f27c4c6481443d1f04a8f23e7f98dd7beda
Author: libenchao <li...@gmail.com>
AuthorDate: Mon Apr 6 16:22:28 2020 +0800

    [FLINK-16827][table-planner-blink] StreamExecTemporalSort should require a distribution trait in StreamExecTemporalSortRule.
    
    This closes #11643
---
 .../stream/StreamExecTemporalSortRule.scala        | 15 ++++++--
 .../table/planner/plan/stream/sql/SortTest.xml     |  6 ++-
 .../runtime/stream/sql/TemporalSortITCase.scala    | 43 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
index cf17cc5..3490526 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecTemporalSortRule.scala
@@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
@@ -46,12 +47,18 @@ class StreamExecTemporalSortRule
   override def convert(rel: RelNode): RelNode = {
     val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
     val input = sort.getInput()
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val convInput: RelNode = RelOptRule.convert(input, FlinkConventions.STREAM_PHYSICAL)
+    val requiredTraitSet = input.getTraitSet
+      .replace(FlinkRelDistribution.SINGLETON)
+      .replace(FlinkConventions.STREAM_PHYSICAL)
+    val providedTraitSet = sort.getTraitSet
+      .replace(FlinkRelDistribution.SINGLETON)
+      .replace(FlinkConventions.STREAM_PHYSICAL)
+
+    val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
 
     new StreamExecTemporalSort(
       rel.getCluster,
-      traitSet,
+      providedTraitSet,
       convInput,
       sort.collation)
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
index af6e441..a1d191e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortTest.xml
@@ -32,7 +32,8 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- TemporalSort(orderBy=[proctime ASC, c ASC])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -136,7 +137,8 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a])
 +- TemporalSort(orderBy=[rowtime ASC, c ASC])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
index de90d7e..ed90898 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
@@ -79,6 +79,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB
   }
 
   @Test
+  def testEventTimeOrderByWithParallelInput(): Unit = {
+    val data = List(
+      (3L, 2L, "Hello world", 3),
+      (2L, 2L, "Hello", 2),
+      (6L, 3L, "Luke Skywalker", 6),
+      (5L, 3L, "I am fine.", 5),
+      (7L, 4L, "Comment#1", 7),
+      (9L, 4L, "Comment#3", 9),
+      (10L, 4L, "Comment#4", 10),
+      (8L, 4L, "Comment#2", 8),
+      (1L, 1L, "Hi", 1),
+      (4L, 3L, "Helloworld, how are you?", 4))
+
+    val t = failingDataSource(data)
+      .assignTimestampsAndWatermarks(
+        new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
+      .setParallelism(env.getParallelism)
+      .toTable(tEnv, 'rowtime.rowtime, 'key, 'str, 'int)
+    tEnv.registerTable("T", t)
+
+    val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime"
+
+    val sink = new TestingRetractSink
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq(
+      "1,Hi,1",
+      "2,Hello,2",
+      "2,Hello world,3",
+      "3,Helloworld, how are you?,4",
+      "3,I am fine.,5",
+      "3,Luke Skywalker,6",
+      "4,Comment#1,7",
+      "4,Comment#2,8",
+      "4,Comment#3,9",
+      "4,Comment#4,10")
+
+    assertEquals(expected, sink.getRetractResults)
+  }
+
+  @Test
   def testEventTimeAndOtherFieldOrderBy(): Unit = {
     val data = List(
       (3L, 2L, "Hello world", 3),


[flink] 01/02: [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3865f7b23c656c74b85b0eb5dd8b73a0f5f88b03
Author: libenchao <li...@gmail.com>
AuthorDate: Mon Jun 1 11:58:11 2020 +0800

    [FLINK-18002][json] Correct the behavior for ContainerNode as varchar type
    
    This closes #12421
---
 .../json/JsonRowDataDeserializationSchema.java     |   6 +-
 .../formats/json/JsonRowDeserializationSchema.java |  10 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 107 ++++++++++++++-------
 .../json/JsonRowDeserializationSchemaTest.java     |  30 ++++++
 4 files changed, 114 insertions(+), 39 deletions(-)

diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index d66ecce..956ca5b 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -304,7 +304,11 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
 	}
 
 	private StringData convertToString(JsonNode jsonNode) {
-		return StringData.fromString(jsonNode.asText());
+		if (jsonNode.isContainerNode()) {
+			return StringData.fromString(jsonNode.toString());
+		} else {
+			return StringData.fromString(jsonNode.asText());
+		}
 	}
 
 	private byte[] convertToBytes(JsonNode jsonNode) {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index cc9b55d..f9a6395 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -347,7 +347,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 		} else if (simpleTypeInfo == Types.BOOLEAN) {
 			return Optional.of(this::convertToBoolean);
 		} else if (simpleTypeInfo == Types.STRING) {
-			return Optional.of((mapper, jsonNode) -> jsonNode.asText());
+			return Optional.of(this::convertToString);
 		} else if (simpleTypeInfo == Types.INT) {
 			return Optional.of(this::convertToInt);
 		} else if (simpleTypeInfo == Types.LONG) {
@@ -381,6 +381,14 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 		}
 	}
 
+	private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
+		if (jsonNode.isContainerNode()) {
+			return jsonNode.toString();
+		} else {
+			return jsonNode.asText();
+		}
+	}
+
 	private boolean convertToBoolean(ObjectMapper mapper, JsonNode jsonNode) {
 		if (jsonNode.isBoolean()) {
 			// avoid redundant toString and parseBoolean, for better performance
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index cab427f..7b561aa 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -30,11 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import org.junit.Rule;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.LocalDate;
@@ -71,9 +69,6 @@ import static org.junit.Assert.assertEquals;
  */
 public class JsonRowDataSerDeSchemaTest {
 
-	@Rule
-	public ExpectedException thrown = ExpectedException.none();
-
 	@Test
 	public void testSerDe() throws Exception {
 		byte tinyint = 'c';
@@ -326,9 +321,13 @@ public class JsonRowDataSerDeSchemaTest {
 		deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema(
 			schema, WrapperTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);
 
-		thrown.expect(IOException.class);
-		thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'");
-		deserializationSchema.deserialize(serializedJson);
+		String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'.";
+		try {
+			deserializationSchema.deserialize(serializedJson);
+			Assert.fail("expecting exception message: " + errorMessage);
+		} catch (Throwable t) {
+			assertEquals(errorMessage, t.getMessage());
+		}
 
 		// ignore on parse error
 		deserializationSchema = new JsonRowDataDeserializationSchema(
@@ -336,12 +335,15 @@ public class JsonRowDataSerDeSchemaTest {
 		actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
 		assertEquals(expected, actual);
 
-		thrown.expect(IllegalArgumentException.class);
-		thrown.expectMessage("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled");
-		// failOnMissingField and ignoreParseErrors both enabled
-		//noinspection ConstantConditions
-		new JsonRowDataDeserializationSchema(
-			schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601);
+		errorMessage = "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.";
+		try {
+			// failOnMissingField and ignoreParseErrors both enabled
+			new JsonRowDataDeserializationSchema(
+				schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601);
+			Assert.fail("expecting exception message: " + errorMessage);
+		} catch (Throwable t) {
+			assertEquals(errorMessage, t.getMessage());
+		}
 	}
 
 	@Test
@@ -380,7 +382,7 @@ public class JsonRowDataSerDeSchemaTest {
 		// the parsing field should be null and no exception is thrown
 		JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema(
 			spec.rowType, WrapperTypeInfo.of(spec.rowType), false, true,
-			TimestampFormat.ISO_8601);
+			spec.timestampFormat);
 		Row expected;
 		if (spec.expected != null) {
 			expected = spec.expected;
@@ -400,8 +402,12 @@ public class JsonRowDataSerDeSchemaTest {
 			spec.rowType, WrapperTypeInfo.of(spec.rowType), false, false,
 			spec.timestampFormat);
 
-		thrown.expectMessage(spec.errorMessage);
-		failingSchema.deserialize(spec.json.getBytes());
+		try {
+			failingSchema.deserialize(spec.json.getBytes());
+			Assert.fail("expecting exception " + spec.errorMessage);
+		} catch (Throwable t) {
+			assertEquals(t.getMessage(), spec.errorMessage);
+		}
 	}
 
 	private static List<TestSpec> testData = Arrays.asList(
@@ -418,7 +424,7 @@ public class JsonRowDataSerDeSchemaTest {
 		TestSpec
 			.json("{\"id\":\"abc\"}")
 			.rowType(ROW(FIELD("id", INT())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."),
 
 		TestSpec
 			.json("{\"id\":112.013}")
@@ -428,84 +434,111 @@ public class JsonRowDataSerDeSchemaTest {
 		TestSpec
 			.json("{\"id\":\"long\"}")
 			.rowType(ROW(FIELD("id", BIGINT())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"112.013.123\"}")
 			.rowType(ROW(FIELD("id", FLOAT())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"112.013.123\"}")
 			.rowType(ROW(FIELD("id", DOUBLE())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"18:00:243\"}")
 			.rowType(ROW(FIELD("id", TIME())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"18:00:243\"}")
 			.rowType(ROW(FIELD("id", TIME())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"20191112\"}")
 			.rowType(ROW(FIELD("id", DATE())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"20191112\"}")
 			.rowType(ROW(FIELD("id", DATE())))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."),
+
+		TestSpec
+			.json("{\"id\":true}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("true")),
+
+		TestSpec
+			.json("{\"id\":123.234}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("123.234")),
+
+		TestSpec
+			.json("{\"id\":1234567}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("1234567")),
+
+		TestSpec
+			.json("{\"id\":\"string field\"}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("string field")),
+
+		TestSpec
+			.json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")),
+
+		TestSpec
+			.json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}")
+			.rowType(ROW(FIELD("id", STRING())))
+			.expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")),
 
 		TestSpec
 			.json("{\"id\":\"2019-11-12 18:00:12\"}")
 			.rowType(ROW(FIELD("id", TIMESTAMP(0))))
 			.timestampFormat(TimestampFormat.ISO_8601)
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"2019-11-12T18:00:12\"}")
 			.rowType(ROW(FIELD("id", TIMESTAMP(0))))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"2019-11-12T18:00:12Z\"}")
 			.rowType(ROW(FIELD("id", TIMESTAMP(0))))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"2019-11-12T18:00:12Z\"}")
 			.rowType(ROW(FIELD("id", TIMESTAMP(0))))
 			.timestampFormat(TimestampFormat.ISO_8601)
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."),
 
 		TestSpec
 			.json("{\"id\":\"abc\"}")
 			.rowType(ROW(FIELD("id", DECIMAL(10, 3))))
-			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."),
 
 		TestSpec
 			.json("{\"row\":{\"id\":\"abc\"}}")
 			.rowType(ROW(FIELD("row", ROW(FIELD("id", BOOLEAN())))))
-			.expect(Row.of(new Row(1)))
-			.expectErrorMessage("Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"),
+			.expect(Row.of(Row.of(false))),
 
 		TestSpec
 			.json("{\"array\":[123, \"abc\"]}")
 			.rowType(ROW(FIELD("array", ARRAY(INT()))))
 			.expect(Row.of((Object) new Integer[]{123, null}))
-			.expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"),
+			.expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'."),
 
 		TestSpec
 			.json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
 			.rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
 			.expect(Row.of(createHashMap("key1", 123, "key2", null)))
-			.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")
-
-
+			.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'.")
 	);
 
 	private static Map<String, Integer> createHashMap(String k1, Integer v1, String k2, Integer v2) {
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index cba7fce..438a18c 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -314,6 +314,36 @@ public class JsonRowDeserializationSchemaTest {
 			.expect(Row.of(112L)),
 
 		TestSpec
+			.json("{\"id\":true}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("true")),
+
+		TestSpec
+			.json("{\"id\":123.234}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("123.234")),
+
+		TestSpec
+			.json("{\"id\":1234567}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("1234567")),
+
+		TestSpec
+			.json("{\"id\":\"string field\"}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("string field")),
+
+		TestSpec
+			.json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")),
+
+		TestSpec
+			.json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}")
+			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
+			.expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")),
+
+		TestSpec
 			.json("{\"id\":\"long\"}")
 			.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.LONG))
 			.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),