You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/21 20:55:35 UTC

[pulsar] branch master updated: [issues 3232] integrate flink-json to pulsar (#3234)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d9691b7  [issues 3232] integrate flink-json to pulsar (#3234)
d9691b7 is described below

commit d9691b71b58159e49300de58d9e8efd87a81673b
Author: wpl <12...@qq.com>
AuthorDate: Sat Dec 22 04:55:30 2018 +0800

    [issues 3232] integrate flink-json to pulsar (#3234)
    
    * Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
    
    * fix up
    
    * remove avro generated
    
    * fix up time-out
    
    * fix up time-out
    
    * fix up
    
    * modify review content code
    
    * modify Tests FAILURE
    
    * run Tests
    
    * run Tests
    
    * fix up pulsar-flink and flink-consumer-source
    
    * fix up pulsar-flink and flink-consumer-source
    
    * add flink streaming table as a Pulsar stream that serializes data in Avro format.
    
    * integrate flink-json to pulsar
---
 pulsar-flink/pom.xml                               |   6 +
 .../connectors/pulsar/PulsarJsonTableSink.java     |   2 +-
 .../pulsar/serde/JsonRowDeserializationSchema.java | 160 ---------------------
 .../pulsar/serde/JsonRowSerializationSchema.java   |  92 ------------
 4 files changed, 7 insertions(+), 253 deletions(-)

diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index c891005..92eb045 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -51,6 +51,12 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
+      <artifactId>flink-json</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
       <artifactId>flink-avro</artifactId>
       <version>${flink.version}</version>
     </dependency>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 1a8d5e3..d6818fa 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.pulsar;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.connectors.pulsar.serde.JsonRowSerializationSchema;
+import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
deleted file mode 100644
index 0235e61..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.serde;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Deserialization schema from JSON to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
- * the specified fields.
- *
- * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
- */
-public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
-
-    /*
-        What to do when detecting that a json line cannot be deserialized :
-        (1).false : Throw A IOException and Terminate application。
-        (2).true  : Ignore the error line and add a null line。
-     */
-    private boolean ignoreJsonFormatError = false;
-
-
-    /**
-     *
-     * @return true or false
-     */
-    public boolean getIgnoreJsonFormatError() {
-        return ignoreJsonFormatError;
-    }
-
-    /**
-     * set ignoreJsonFormatError
-     * @param ignoreJsonFormatError
-     */
-    public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) {
-        this.ignoreJsonFormatError = ignoreJsonFormatError;
-    }
-
-    /**
-     * Type information describing the result type.
-     */
-    private final TypeInformation<Row> typeInfo;
-
-    /**
-     * Field names to parse. Indices match fieldTypes indices.
-     */
-    private final String[] fieldNames;
-
-    /**
-     * Types to parse fields as. Indices match fieldNames indices.
-     */
-    private final TypeInformation<?>[] fieldTypes;
-
-    /**
-     * Object mapper for parsing the JSON.
-     */
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Flag indicating whether to fail on a missing field.
-     */
-    private boolean failOnMissingField;
-
-    /**
-     * Creates a JSON deserialization schema for the given fields and types.
-     *
-     * @param typeInfo Type information describing the result type. The field names are used
-     *                 to parse the JSON file and so are the types.
-     */
-    public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
-        Preconditions.checkNotNull(typeInfo, "Type information");
-        this.typeInfo = typeInfo;
-
-        this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
-        this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
-    }
-
-    @Override
-    public Row deserialize(byte[] message) throws IOException {
-        try {
-            JsonNode root = objectMapper.readTree(message);
-
-            Row row = new Row(fieldNames.length);
-            for (int i = 0; i < fieldNames.length; i++) {
-                JsonNode node = root.get(fieldNames[i]);
-
-                if (node == null) {
-                    if (failOnMissingField) {
-                        throw new IllegalStateException("Failed to find field with name '"
-                                + fieldNames[i] + "'.");
-                    } else {
-                        row.setField(i, null);
-                    }
-                } else {
-                    // Read the value as specified type
-                    Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
-                    row.setField(i, value);
-                }
-            }
-
-            return row;
-        } catch (Throwable t) {
-            if (ignoreJsonFormatError) {
-                final int arity = typeInfo.getArity();
-                final Object[] nullsArray = new Object[arity];
-                return Row.of(nullsArray);
-            } else {
-                throw new IOException("Failed to deserialize JSON object.", t);
-            }
-        }
-    }
-
-    @Override
-    public boolean isEndOfStream(Row nextElement) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<Row> getProducedType() {
-        return typeInfo;
-    }
-
-    /**
-     * Configures the failure behaviour if a JSON field is missing.
-     *
-     * <p>By default, a missing field is ignored and the field is set to null.
-     *
-     * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
-     */
-    public void setFailOnMissingField(boolean failOnMissingField) {
-        this.failOnMissingField = failOnMissingField;
-    }
-
-}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
deleted file mode 100644
index 503f01e..0000000
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flink.streaming.connectors.pulsar.serde;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Serialization schema that serializes an object into a JSON bytes.
- *
- * <p>Serializes the input {@link Row} object into a JSON string and
- * converts it into <code>byte[]</code>.
- *
- * <p>Result <code>byte[]</code> messages can be deserialized using
- * {@link JsonRowDeserializationSchema}.
- */
-public class JsonRowSerializationSchema implements SerializationSchema<Row> {
-    /**
-     * Fields names in the input Row object.
-     */
-    private final String[] fieldNames;
-    /**
-     * Object mapper that is used to create output JSON objects.
-     */
-    private static ObjectMapper mapper = new ObjectMapper();
-
-    /**
-     * Creates a JSON serialization schema for the given fields and types.
-     *
-     * @param rowSchema The schema of the rows to encode.
-     */
-    public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
-
-        Preconditions.checkNotNull(rowSchema);
-        String[] fieldNames = rowSchema.getFieldNames();
-        TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
-
-        // check that no field is composite
-        for (int i = 0; i < fieldTypes.length; i++) {
-            if (fieldTypes[i] instanceof CompositeType) {
-                throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
-                        "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
-            }
-        }
-
-        this.fieldNames = fieldNames;
-    }
-
-    @Override
-    public byte[] serialize(Row row) {
-        if (row.getArity() != fieldNames.length) {
-            throw new IllegalStateException(String.format(
-                    "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length));
-        }
-
-        ObjectNode objectNode = mapper.createObjectNode();
-
-        for (int i = 0; i < row.getArity(); i++) {
-            JsonNode node = mapper.valueToTree(row.getField(i));
-            objectNode.set(fieldNames[i], node);
-        }
-
-        try {
-            return mapper.writeValueAsBytes(objectNode);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to serialize row", e);
-        }
-    }
-}