You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/21 20:55:35 UTC
[pulsar] branch master updated: [issues 3232] integrate flink-json
to pulsar (#3234)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d9691b7 [issues 3232] integrate flink-json to pulsar (#3234)
d9691b7 is described below
commit d9691b71b58159e49300de58d9e8efd87a81673b
Author: wpl <12...@qq.com>
AuthorDate: Sat Dec 22 04:55:30 2018 +0800
[issues 3232] integrate flink-json to pulsar (#3234)
* Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
* fix up
* remove avro generated
* fix up time-out
* fix up time-out
* fix up
* modify review content code
* modify Tests FAILURE
* run Tests
* run Tests
* fix up pulsar-flink and flink-consumer-source
* fix up pulsar-flink and flink-consumer-source
* add flink streaming table as a Pulsar stream that serializes data in Avro format.
* integrate flink-json to pulsar
---
pulsar-flink/pom.xml | 6 +
.../connectors/pulsar/PulsarJsonTableSink.java | 2 +-
.../pulsar/serde/JsonRowDeserializationSchema.java | 160 ---------------------
.../pulsar/serde/JsonRowSerializationSchema.java | 92 ------------
4 files changed, 7 insertions(+), 253 deletions(-)
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index c891005..92eb045 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -51,6 +51,12 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 1a8d5e3..d6818fa 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.pulsar;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.connectors.pulsar.serde.JsonRowSerializationSchema;
+import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.ProducerConfiguration;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
deleted file mode 100644
index 0235e61..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.serde;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- */
-public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
-
- /*
- What to do when detecting that a json line cannot be deserialized :
- (1).false : Throw A IOException and Terminate application。
- (2).true : Ignore the error line and add a null line。
- */
- private boolean ignoreJsonFormatError = false;
-
-
- /**
- *
- * @return true or false
- */
- public boolean getIgnoreJsonFormatError() {
- return ignoreJsonFormatError;
- }
-
- /**
- * set ignoreJsonFormatError
- * @param ignoreJsonFormatError
- */
- public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) {
- this.ignoreJsonFormatError = ignoreJsonFormatError;
- }
-
- /**
- * Type information describing the result type.
- */
- private final TypeInformation<Row> typeInfo;
-
- /**
- * Field names to parse. Indices match fieldTypes indices.
- */
- private final String[] fieldNames;
-
- /**
- * Types to parse fields as. Indices match fieldNames indices.
- */
- private final TypeInformation<?>[] fieldTypes;
-
- /**
- * Object mapper for parsing the JSON.
- */
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- /**
- * Flag indicating whether to fail on a missing field.
- */
- private boolean failOnMissingField;
-
- /**
- * Creates a JSON deserialization schema for the given fields and types.
- *
- * @param typeInfo Type information describing the result type. The field names are used
- * to parse the JSON file and so are the types.
- */
- public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
- Preconditions.checkNotNull(typeInfo, "Type information");
- this.typeInfo = typeInfo;
-
- this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
- this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
- }
-
- @Override
- public Row deserialize(byte[] message) throws IOException {
- try {
- JsonNode root = objectMapper.readTree(message);
-
- Row row = new Row(fieldNames.length);
- for (int i = 0; i < fieldNames.length; i++) {
- JsonNode node = root.get(fieldNames[i]);
-
- if (node == null) {
- if (failOnMissingField) {
- throw new IllegalStateException("Failed to find field with name '"
- + fieldNames[i] + "'.");
- } else {
- row.setField(i, null);
- }
- } else {
- // Read the value as specified type
- Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
- row.setField(i, value);
- }
- }
-
- return row;
- } catch (Throwable t) {
- if (ignoreJsonFormatError) {
- final int arity = typeInfo.getArity();
- final Object[] nullsArray = new Object[arity];
- return Row.of(nullsArray);
- } else {
- throw new IOException("Failed to deserialize JSON object.", t);
- }
- }
- }
-
- @Override
- public boolean isEndOfStream(Row nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- return typeInfo;
- }
-
- /**
- * Configures the failure behaviour if a JSON field is missing.
- *
- * <p>By default, a missing field is ignored and the field is set to null.
- *
- * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
- */
- public void setFailOnMissingField(boolean failOnMissingField) {
- this.failOnMissingField = failOnMissingField;
- }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
deleted file mode 100644
index 503f01e..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.serde;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- */
-public class JsonRowSerializationSchema implements SerializationSchema<Row> {
- /**
- * Fields names in the input Row object.
- */
- private final String[] fieldNames;
- /**
- * Object mapper that is used to create output JSON objects.
- */
- private static ObjectMapper mapper = new ObjectMapper();
-
- /**
- * Creates a JSON serialization schema for the given fields and types.
- *
- * @param rowSchema The schema of the rows to encode.
- */
- public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
-
- Preconditions.checkNotNull(rowSchema);
- String[] fieldNames = rowSchema.getFieldNames();
- TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
-
- // check that no field is composite
- for (int i = 0; i < fieldTypes.length; i++) {
- if (fieldTypes[i] instanceof CompositeType) {
- throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
- "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
- }
- }
-
- this.fieldNames = fieldNames;
- }
-
- @Override
- public byte[] serialize(Row row) {
- if (row.getArity() != fieldNames.length) {
- throw new IllegalStateException(String.format(
- "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
- }
-
- ObjectNode objectNode = mapper.createObjectNode();
-
- for (int i = 0; i < row.getArity(); i++) {
- JsonNode node = mapper.valueToTree(row.getField(i));
- objectNode.set(fieldNames[i], node);
- }
-
- try {
- return mapper.writeValueAsBytes(objectNode);
- } catch (Exception e) {
- throw new RuntimeException("Failed to serialize row", e);
- }
- }
-}