You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 08:47:47 UTC

[inlong] 02/02: [INLONG-5701][Manager][Sort] Support raw format (#5702)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit a7af443fe6a529dcac4d5caf5ddc1a6f13393491
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Fri Aug 26 16:43:09 2022 +0800

    [INLONG-5701][Manager][Sort] Support raw format (#5702)
---
 .../apache/inlong/common/enums/DataTypeEnum.java   |  3 +-
 .../manager/pojo/sort/util/ExtractNodeUtils.java   |  4 ++
 .../inlong/sort/protocol/node/format/Format.java   |  3 +-
 .../sort/protocol/node/format/RawFormat.java       | 82 ++++++++++++++++++++++
 .../sort/protocol/node/format/RawFormatTest.java   | 37 ++++------
 5 files changed, 104 insertions(+), 25 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 8eeb6436b..e7cbbe9f5 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -25,7 +25,8 @@ public enum DataTypeEnum {
     AVRO("avro"),
     JSON("json"),
     CANAL("canal"),
-    DEBEZIUM_JSON("debezium_json");
+    DEBEZIUM_JSON("debezium_json"),
+    RAW("raw");
 
     @Getter
     private final String name;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index c94f84af4..d0e0fa4dc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -56,6 +56,7 @@ import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
 
 import java.util.List;
 import java.util.Map;
@@ -248,6 +249,9 @@ public class ExtractNodeUtils {
             case DEBEZIUM_JSON:
                 format = new DebeziumJsonFormat();
                 break;
+            case RAW:
+                format = new RawFormat();
+                break;
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported dataType=%s for pulsar source", dataType));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
index 4d2b30c5f..e40e6ed79 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
@@ -38,7 +38,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = DebeziumJsonFormat.class, name = "debeziumJsonFormat"),
         @JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat"),
         @JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"),
-        @JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat")
+        @JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat"),
+        @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat")
 })
 public interface Format extends Serializable {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java
new file mode 100644
index 000000000..e2281d229
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.ToString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * The Raw format
+ *
+ * @see <a herf="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/raw/">
+ *         Raw Format</a>
+ */
+@Data
+@JsonTypeName("rawFormat")
+@ToString
+public class RawFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "rawCharset", defaultValue = "UTF-8")
+    private String rawCharset;
+    @JsonProperty(value = "rawEndianness", defaultValue = "big-endian")
+    private String rawEndianness;
+
+    @JsonCreator
+    public RawFormat(@JsonProperty(value = "rawCharset", defaultValue = "UTF-8") String rawCharset,
+            @JsonProperty(value = "rawEndianness", defaultValue = "big-endian") String rawEndianness) {
+        this.rawCharset = rawCharset;
+        this.rawEndianness = rawEndianness;
+    }
+
+    @JsonCreator
+    public RawFormat() {
+        this("UTF-8", "big-endian");
+    }
+
+    /**
+     * Return raw
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "raw";
+    }
+
+    /**
+     * Generate options for connector
+     *
+     * @return options
+     */
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(16);
+        options.put("format", getFormat());
+        options.put("raw.charset", this.rawCharset);
+        options.put("raw.endianness", this.rawEndianness);
+        return options;
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
similarity index 54%
copy from inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
index 8eeb6436b..6da8854c1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
@@ -15,31 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.enums;
+package org.apache.inlong.sort.protocol.node.format;
 
-import java.util.Locale;
-import lombok.Getter;
+import org.apache.inlong.sort.SerializeBaseTest;
 
-public enum DataTypeEnum {
-    CSV("csv"),
-    AVRO("avro"),
-    JSON("json"),
-    CANAL("canal"),
-    DEBEZIUM_JSON("debezium_json");
-
-    @Getter
-    private final String name;
-
-    DataTypeEnum(String name) {
-        this.name = name;
-    }
+/**
+ * Test for {@link RawFormat}
+ */
+public class RawFormatTest extends SerializeBaseTest<RawFormat> {
 
-    public static DataTypeEnum forName(String name) {
-        for (DataTypeEnum dataType : values()) {
-            if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) {
-                return dataType;
-            }
-        }
-        throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+    /**
+     * Get test object
+     *
+     * @return The test object
+     */
+    @Override
+    public RawFormat getTestObject() {
+        return new RawFormat();
     }
 }