You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/16 11:42:27 UTC

[GitHub] buptljy closed pull request #6568: [FLINK-10119][Kafka Connector] - JsonRowDeserializeSchema deserialize message with error

buptljy closed pull request #6568: [FLINK-10119][Kafka Connector] - JsonRowDeserializeSchema deserialize message with error
URL: https://github.com/apache/flink/pull/6568
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index dc8a116ac62..e4e9f48ef25 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -62,6 +62,12 @@
 	/** Flag indicating whether to fail on a missing field. */
 	private boolean failOnMissingField;
 
+	/** Flag indicating whether to ignore the line when exeption is thrown. */
+	private boolean nullErrorLine;
+
+	/** Flag indicating whether to add an additional field when exception is thrown. */
+	private boolean additionalErrorField;
+
 	/**
 	 * Creates a JSON deserialization schema for the given type information.
 	 *
@@ -94,7 +100,19 @@ public Row deserialize(byte[] message) throws IOException {
 			final JsonNode root = objectMapper.readTree(message);
 			return convertRow(root, (RowTypeInfo) typeInfo);
 		} catch (Throwable t) {
-			throw new IOException("Failed to deserialize JSON object.", t);
+			if (nullErrorLine || additionalErrorField) {
+				final int arity = typeInfo.getArity();
+				final Object[] nullsArray = new Object[arity];
+				if (additionalErrorField) {
+					final Object[] addionalNullsArray = new Object[arity + 1];
+					System.arraycopy(nullsArray, 0, addionalNullsArray, 0, arity);
+					addionalNullsArray[arity] = t.getMessage();
+					return Row.of(addionalNullsArray);
+				}
+				return Row.of(nullsArray);
+			} else {
+				throw new IOException("Failed to deserialize JSON object.", t);
+			}
 		}
 	}
 
@@ -119,6 +137,20 @@ public void setFailOnMissingField(boolean failOnMissingField) {
 		this.failOnMissingField = failOnMissingField;
 	}
 
+	/**
+	 * Configures whether to ignore the line when exeption is thrown. false by default.
+	 */
+	public void setNullErrorLine(boolean nullErrorLine) {
+		this.nullErrorLine = nullErrorLine;
+	}
+
+	/**
+	 * Configures whether to add an additional field when exeption is thrown. false by default.
+	 */
+	public void setAdditionalErrorField(boolean additionalErrorField) {
+		this.additionalErrorField = additionalErrorField;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
index 458b94a5df7..a65234e4578 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -60,6 +60,8 @@ public boolean supportsSchemaDerivation() {
 		properties.add(JsonValidator.FORMAT_JSON_SCHEMA);
 		properties.add(JsonValidator.FORMAT_SCHEMA);
 		properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD);
+		properties.add(JsonValidator.FORMAT_NULL_ERROR_LINE);
+		properties.add(JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD);
 		properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
 		properties.addAll(SchemaValidator.getSchemaDerivationKeys());
 		return properties;
@@ -74,6 +76,10 @@ public boolean supportsSchemaDerivation() {
 
 		descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
 				.ifPresent(schema::setFailOnMissingField);
+		descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_NULL_ERROR_LINE)
+			.ifPresent(schema::setNullErrorLine);
+		descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD)
+			.ifPresent(schema::setAdditionalErrorField);
 
 		return schema;
 	}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
index 026f49e611c..75a1ef0b0e5 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
@@ -24,8 +24,10 @@
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_ADDITIONAL_ERROR_FIELD;
 import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
 import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_NULL_ERROR_LINE;
 import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
 import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
 
@@ -35,6 +37,8 @@
 public class Json extends FormatDescriptor {
 
 	private Boolean failOnMissingField;
+	private Boolean nullErrorLine;
+	private Boolean additionalErrorField;
 	private Boolean deriveSchema;
 	private String jsonSchema;
 	private String schema;
@@ -57,6 +61,27 @@ public Json failOnMissingField(boolean failOnMissingField) {
 		return this;
 	}
 
+	/**
+	 * Sets flag whether to ignore the line if exception is thrown.
+	 *
+	 * @param nullErrorLine If set to true, the line will be ignored if exception is thrown.
+	 */
+	public Json nullErrorLine(boolean nullErrorLine) {
+		this.nullErrorLine = nullErrorLine;
+		return this;
+	}
+
+	/**
+	 * Sets flag whether to add an additional field to store error messages if exception is thrown.
+	 *
+	 * @param additionalErrorField If set to true, there will be an additional field to store
+	 *                             if exception is thrown.
+	 */
+	public Json additionalErrorField(boolean additionalErrorField) {
+		this.additionalErrorField = additionalErrorField;
+		return this;
+	}
+
 	/**
 	 * Sets the JSON schema string with field names and the types according to the JSON schema
 	 * specification [[http://json-schema.org/specification.html]].
@@ -126,5 +151,13 @@ public void addFormatProperties(DescriptorProperties properties) {
 		if (failOnMissingField != null) {
 			properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField);
 		}
+
+		if (nullErrorLine != null) {
+			properties.putBoolean(FORMAT_NULL_ERROR_LINE, nullErrorLine);
+		}
+
+		if (additionalErrorField != null) {
+			properties.putBoolean(FORMAT_ADDITIONAL_ERROR_FIELD, additionalErrorField);
+		}
 	}
 }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
index 49e1abc8a19..1b8bd0328f2 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
@@ -29,6 +29,8 @@
 	public static final String FORMAT_SCHEMA = "format.schema";
 	public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
 	public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field";
+	public static final String FORMAT_NULL_ERROR_LINE = "format.null-error-line";
+	public static final String FORMAT_ADDITIONAL_ERROR_FIELD = "format.additional-error-field";
 
 	@Override
 	public void validate(DescriptorProperties properties) {
@@ -51,5 +53,7 @@ public void validate(DescriptorProperties properties) {
 		}
 
 		properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
+		properties.validateBoolean(FORMAT_NULL_ERROR_LINE, true);
+		properties.validateBoolean(FORMAT_ADDITIONAL_ERROR_FIELD, true);
 	}
 }
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index 5e77b801a44..437efe0f559 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -36,6 +36,8 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 /**
  * Tests for the {@link JsonRowDeserializationSchema}.
@@ -179,6 +181,41 @@ public void testMissingNode() throws Exception {
 		}
 	}
 
+	/**
+	 * Test deserialization with setting nullErrorLine true.
+	 */
+	@Test
+	public void testNullErrorLine() throws IOException {
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
+			Types.ROW_NAMED(
+				new String[] { "name" },
+				Types.STRING)
+		);
+		deserializationSchema.setNullErrorLine(true);
+
+		final Row row = deserializationSchema.deserialize("zxvdd".getBytes());
+		assertNull(row.getField(0));
+	}
+
+	/**
+	 * Test deserialization with setting additionalErrorField true.
+	 */
+	@Test
+	public void testAdditionalErrorField() throws IOException {
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
+			Types.ROW_NAMED(
+				new String[] { "name", "age" },
+				Types.STRING, Types.INT)
+		);
+		deserializationSchema.setAdditionalErrorField(true);
+
+		final Row row = deserializationSchema.deserialize("zxvdd".getBytes());
+		assertEquals(3, row.getArity());
+		assertNull(row.getField(0));
+		assertNull(row.getField(1));
+		assertNotNull(row.getField(2));
+	}
+
 	/**
 	 * Tests that number of field names and types has to match.
 	 */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services