You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/11 09:02:11 UTC
[incubator-inlong] branch master updated: [InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 158cf5548 [InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)
158cf5548 is described below
commit 158cf554847948baa020ddbd0d4ee61ba830db83
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Wed May 11 17:02:07 2022 +0800
[InLong-4066][Sort] Import inlong format to parse InLongMsg (#4148)
---
.../sort-formats/format-inlongmsg-base/pom.xml | 12 ++
.../formats/inlongmsg/InLongMsgDecodingFormat.java | 178 ++++++++++++++++++++
.../inlongmsg/InLongMsgDeserializationSchema.java | 163 +++++++++++++++++++
.../formats/inlongmsg/InLongMsgFormatFactory.java | 92 +++++++++++
.../sort/formats/inlongmsg/InLongMsgOptions.java | 51 ++++++
.../sort/formats/inlongmsg/InLongMsgUtils.java | 33 ++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../inlongmsg/InLongMsgFormatFactoryTest.java | 124 ++++++++++++++
.../inlongmsg/InLongMsgRowDataSerDeTest.java | 179 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../src/test/resources/log4j-test.properties | 25 +++
.../InLongMsgCsvFormatDeserializer.java | 2 +-
.../InLongMsgCsvMixedFormatDeserializer.java | 2 +-
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 42 -----
inlong-sort/sort-single-tenant/pom.xml | 42 +++--
15 files changed, 917 insertions(+), 60 deletions(-)
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index a36d6bf14..49edb5358 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -87,6 +87,18 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
new file mode 100644
index 000000000..eca346cca
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+ private final String innerFormatMetaPrefix;
+
+ private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat;
+
+ private List<String> metadataKeys;
+
+ private final boolean ignoreErrors;
+
+ public InLongMsgDecodingFormat(
+ DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
+ String innerFormatMetaPrefix,
+ boolean ignoreErrors) {
+ this.innerDecodingFormat = innerDecodingFormat;
+ this.innerFormatMetaPrefix = innerFormatMetaPrefix;
+ this.metadataKeys = Collections.emptyList();
+ this.ignoreErrors = ignoreErrors;
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(Context context, DataType physicalDataType) {
+ final MetadataConverter[] metadataConverters = Arrays.stream(ReadableMetadata.values())
+ .filter(metadata -> metadataKeys.contains(metadata.key))
+ .map(metadata -> metadata.converter)
+ .toArray(MetadataConverter[]::new);
+ final List<ReadableMetadata> readableMetadata =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ final List<DataTypes.Field> metadataFields =
+ readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ return new InLongMsgDeserializationSchema(
+ innerDecodingFormat.createRuntimeDecoder(context, physicalDataType),
+ metadataConverters,
+ producedTypeInfo,
+ ignoreErrors);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // add inner format metadata with prefix
+ innerDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.putIfAbsent(innerFormatMetaPrefix + key, value));
+
+ // add format metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys) {
+ // separate inner format and format metadata
+ final List<String> innerFormatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(innerFormatMetaPrefix))
+ .collect(Collectors.toList());
+ final List<String> formatMetadataKeys = new ArrayList<>(metadataKeys);
+ formatMetadataKeys.removeAll(innerFormatMetadataKeys);
+ this.metadataKeys = formatMetadataKeys;
+
+ // push down inner format metadata
+ final Map<String, DataType> formatMetadata = innerDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ innerFormatMetadataKeys.stream()
+ .map(k -> k.substring(innerFormatMetaPrefix.length()))
+ .collect(Collectors.toList());
+ innerDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return innerDecodingFormat.getChangelogMode();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+ CREATE_TIME(
+ "create-time",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(InLongMsgHead head) {
+ return TimestampData.fromTimestamp(head.getTime());
+ }
+ }),
+
+ STREAM_ID(
+ "stream-id",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(InLongMsgHead head) {
+ return StringData.fromString(head.getTid());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
new file mode 100644
index 000000000..3001bd8ec
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.common.msg.InLongMsg;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class InLongMsgDeserializationSchema implements DeserializationSchema<RowData> {
+
+ /** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} inner packaged
+ * data buffer message */
+ private final DeserializationSchema<RowData> deserializationSchema;
+
+ /** {@link MetadataConverter} of how to produce metadata from {@link InLongMsg}. */
+ private final MetadataConverter[] metadataConverters;
+
+ /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ /** */
+ private final boolean ignoreErrors;
+
+ public InLongMsgDeserializationSchema(
+ DeserializationSchema<RowData> schema,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean ignoreErrors) {
+ this.deserializationSchema = schema;
+ this.metadataConverters = metadataConverters;
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreErrors = ignoreErrors;
+ }
+
+ @Override
+ public RowData deserialize(byte[] bytes) throws IOException {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
+ InLongMsg inLongMsg = InLongMsg.parseFrom(message);
+
+ for (String attr : inLongMsg.getAttrs()) {
+ InLongMsgHead head;
+ try {
+ head = InLongMsgUtils.parseHead(attr);
+ } catch (Throwable t) {
+ if (ignoreErrors) {
+ continue;
+ }
+ throw new IOException(
+ "Failed to deserialize InLongMsg row '" + new String(message) + "'.", t);
+ }
+
+ Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
+ if (iterator == null) {
+ continue;
+ }
+
+ while (iterator.hasNext()) {
+
+ byte[] bodyBytes = iterator.next();
+ long bodyLength = bodyBytes == null ? 0 : bodyBytes.length;
+
+ if (bodyLength == 0) {
+ continue;
+ }
+
+ List<RowData> list = new ArrayList<>();
+ ListCollector<RowData> collector = new ListCollector<>(list);
+ deserializationSchema.deserialize(bodyBytes, collector);
+ list.stream().forEach(rowdata -> emitRow(head, (GenericRowData) rowdata, out));
+ }
+ }
+
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData rowData) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof InLongMsgDeserializationSchema)) {
+ return false;
+ }
+ InLongMsgDeserializationSchema that = (InLongMsgDeserializationSchema) o;
+ return ignoreErrors == that.ignoreErrors
+ && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+ Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+ && Objects.equal(deserializationSchema, that.deserializationSchema)
+ && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, ignoreErrors);
+ }
+
+ interface MetadataConverter extends Serializable {
+ Object read(InLongMsgHead head);
+ }
+
+ /** add metadata column */
+ private void emitRow(InLongMsgHead head, GenericRowData physicalRow, Collector<RowData> out) {
+ if (metadataConverters.length == 0) {
+ out.collect(physicalRow);
+ return;
+ }
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos, metadataConverters[metadataPos].read(head));
+ }
+ out.collect(producedRow);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
new file mode 100644
index 000000000..bdc3d3059
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;
+
+public final class InLongMsgFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "inlong";
+
+ public static final String INLONG_PREFIX = "inlong.";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context,
+ ReadableConfig formatOptions) {
+ validateDecodingFormatOptions(formatOptions);
+
+ final DeserializationFormatFactory innerFactory = FactoryUtil.discoverFactory(
+ context.getClassLoader(),
+ DeserializationFormatFactory.class,
+ formatOptions.get(INNER_FORMAT));
+ Configuration allOptions = Configuration.fromMap(context.getCatalogTable().getOptions());
+ String innerFormatMetaPrefix = formatOptions.get(INNER_FORMAT) + ".";
+ String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
+ DecodingFormat<DeserializationSchema<RowData>> innerFormat =
+ innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix));
+ boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+
+ return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix, ignoreErrors);
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(Context context,
+ ReadableConfig formatOptions) {
+ throw new RuntimeException("Do not support inlong format serialize.");
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(INNER_FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(IGNORE_PARSE_ERRORS);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
new file mode 100644
index 000000000..61320fbe3
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+public class InLongMsgOptions {
+ private InLongMsgOptions() {
+ }
+
+ public static final ConfigOption<String> INNER_FORMAT =
+ ConfigOptions.key("inner.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Defines the format identifier for encoding attr data. \n"
+ + "The identifier is used to discover a suitable format factory.");
+
+ public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("ignore-parse-errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n"
+ + "fields are set to null in case of errors");
+
+ public static void validateDecodingFormatOptions(ReadableConfig config) {
+ String innerFormat = config.get(INNER_FORMAT);
+ if (innerFormat == null) {
+ throw new ValidationException(
+ INNER_FORMAT.key() + " shouldn't be null.");
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index e3569a4e9..4236d5dde 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -108,6 +108,39 @@ public class InLongMsgUtils {
}
}
+ public static InLongMsgHead parseHead(String attr) {
+ Map<String, String> attributes = parseAttr(attr);
+
+ // Extracts interface from the attributes.
+ String streamId;
+
+ if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ } else {
+ throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
+ }
+
+ // Extracts time from the attributes
+ Timestamp time;
+
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ time = parseDateTime(date);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ time = parseEpochTime(epoch);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_TIME_T
+ + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
+ }
+
+ // Extracts predefined fields from the attributes
+ List<String> predefinedFields = getPredefinedFields(attributes);
+
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
+ }
+
public static Map<String, String> parseAttr(String attr) {
return StringUtils.splitKv(
attr,
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..d2c119926
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
new file mode 100644
index 000000000..e48cf774d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import org.apache.flink.formats.csv.CsvRowSchemaConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import org.junit.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+public class InLongMsgFormatFactoryTest {
+
+ @Test
+ public void testUserDefinedOptions()
+ throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+ final Map<String, String> tableOptions =
+ getModifiedOptions(opts -> {
+ opts.put("inlong.inner.format", "csv");
+ opts.put("inlong.ignore-parse-errors", "true");
+ });
+
+ // test Deser
+ Constructor[] constructors = CsvRowDataDeserializationSchema.class.getDeclaredConstructors();
+ Constructor constructor = CsvRowDataDeserializationSchema.class.getDeclaredConstructor(
+ RowType.class, TypeInformation.class, CsvSchema.class, boolean.class);
+ constructor.setAccessible(true);
+ DeserializationSchema<RowData> deserializationSchema = (DeserializationSchema<RowData>) constructor.newInstance(
+ PHYSICAL_TYPE, InternalTypeInfo.of(PHYSICAL_TYPE),
+ CsvRowSchemaConverter.convert(PHYSICAL_TYPE), false);
+
+ InLongMsgDeserializationSchema expectedDeser = new InLongMsgDeserializationSchema(
+ deserializationSchema, new MetadataConverter[0], InternalTypeInfo.of(PHYSICAL_TYPE), true);
+ DeserializationSchema<RowData> actualDeser = createDeserializationSchema(tableOptions, SCHEMA);
+ assertEquals(expectedDeser, actualDeser);
+ }
+
+ // ------------------------------------------------------------------------
+ // Public Tools
+ // ------------------------------------------------------------------------
+ public static DeserializationSchema<RowData> createDeserializationSchema(
+ Map<String, String> options, ResolvedSchema schema) {
+ DynamicTableSource source = createTableSource(schema, options);
+
+ assert source instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) source;
+
+ return scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType());
+ }
+
+ public static SerializationSchema<RowData> createSerializationSchema(
+ Map<String, String> options, ResolvedSchema schema) {
+ DynamicTableSink sink = createTableSink(schema, options);
+
+ assert sink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) sink;
+
+ return sinkMock.valueFormat.createRuntimeEncoder(
+ new SinkRuntimeProviderContext(false), schema.toPhysicalRowDataType());
+ }
+
+ /**
+ * Returns the full options modified by the given consumer {@code optionModifier}.
+ *
+ * @param optionModifier Consumer to modify the options
+ */
+ public static Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+ Map<String, String> options = getAllOptions();
+ optionModifier.accept(options);
+ return options;
+ }
+
+ private static Map<String, String> getAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+ options.put("format", "inlong");
+ return options;
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
new file mode 100644
index 000000000..2d4c89737
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsg;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.factories.utils.FactoryMocks;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class InLongMsgRowDataSerDeTest {
+
+ @Test
+ public void testDeserializeInLongMsg() throws Exception {
+ // mock data
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+ inLongMsg1.addMsg("streamId=HAHA&t=202201011112",
+ "1,asdqw".getBytes(StandardCharsets.UTF_8));
+ inLongMsg1.addMsg("streamId=xixi&t=202201011112",
+ "2,testData".getBytes(StandardCharsets.UTF_8));
+ InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
+ inLongMsg1.addMsg("streamId=oooo&t=202201011112",
+ "3,dwqdqw".getBytes(StandardCharsets.UTF_8));
+ inLongMsg1.addMsg("streamId=bubub&t=202201011112",
+ "4,asdqdqwe".getBytes(StandardCharsets.UTF_8));
+ List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+ List<byte[]> input = inLongMsgs.stream()
+ .map(inLongMsg -> inLongMsg.buildArray())
+ .collect(Collectors.toList());
+ final List<RowData> exceptedOutput = Stream.of(
+ GenericRowData.of(1L, BinaryStringData.fromString("asdqw")),
+ GenericRowData.of(2L, BinaryStringData.fromString("testData")),
+ GenericRowData.of(3L, BinaryStringData.fromString("dwqdqw")),
+ GenericRowData.of(4L, BinaryStringData.fromString("asdqdqwe"))
+ ).collect(Collectors.toList());
+
+ // deserialize
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong.inner.format", "csv");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("name", DataTypes.STRING())
+ );
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+ InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+ for (byte[] bytes : input) {
+ inLongMsgDeserializationSchema.deserialize(bytes, out);
+ }
+
+ assertEquals(exceptedOutput, deData);
+ }
+
+ @Test
+ public void testDeserializeInLongMsgWithError() throws Exception {
+ // mock data
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+ inLongMsg1.addMsg("asdq",
+ "1, asd".getBytes(StandardCharsets.UTF_8));
+ List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+ List<byte[]> input = inLongMsgs.stream()
+ .map(inLongMsg -> inLongMsg.buildArray())
+ .collect(Collectors.toList());
+
+ // deserialize
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong.inner.format", "csv");
+ opts.put("inlong.ignore-parse-errors", "true");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("name", DataTypes.STRING())
+ );
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+ InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+ for (byte[] bytes : input) {
+ inLongMsgDeserializationSchema.deserialize(bytes, out);
+ }
+ assertEquals(Collections.emptyList(), deData);
+ }
+
+ @Test
+ public void testDeserializeInLongMsgWithMetadata() throws Exception {
+ InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
+ inLongMsg1.addMsg("streamId=HAHA&dt=1652153467000",
+ "1,asdqw".getBytes(StandardCharsets.UTF_8));
+ inLongMsg1.addMsg("streamId=xixi&dt=1652153467000",
+ "2,testData".getBytes(StandardCharsets.UTF_8));
+ InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
+ inLongMsg1.addMsg("streamId=oooo&dt=1652153468000",
+ "3,dwqdqw".getBytes(StandardCharsets.UTF_8));
+ inLongMsg1.addMsg("streamId=bubub&dt=1652153469000",
+ "4,asdqdqwe".getBytes(StandardCharsets.UTF_8));
+ List<InLongMsg> inLongMsgs = Stream.of(inLongMsg1).collect(Collectors.toList());
+ List<byte[]> input = inLongMsgs.stream()
+ .map(inLongMsg -> inLongMsg.buildArray())
+ .collect(Collectors.toList());
+ final List<RowData> exceptedOutput = Stream.of(
+ GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
+ TimestampData.fromTimestamp(new Timestamp(1652153467000L))),
+ GenericRowData.of(2L, BinaryStringData.fromString("testData"),
+ TimestampData.fromTimestamp(new Timestamp(1652153467000L))),
+ GenericRowData.of(3L, BinaryStringData.fromString("dwqdqw"),
+ TimestampData.fromTimestamp(new Timestamp(1652153468000L))),
+ GenericRowData.of(4L, BinaryStringData.fromString("asdqdqwe"),
+ TimestampData.fromTimestamp(new Timestamp(1652153469000L)))
+ ).collect(Collectors.toList());
+
+ // deserialize
+ final Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("inner.format", "csv");
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("name", DataTypes.STRING()),
+ Column.metadata("time", DataTypes.TIMESTAMP(3), "create-time", false)
+
+ );
+
+ // apply metadata
+ InLongMsgFormatFactory factory = new InLongMsgFormatFactory();
+ DecodingFormat<DeserializationSchema<RowData>> decodingFormat = factory.createDecodingFormat(FactoryMocks
+ .createTableContext(schema, tableOptions), Configuration.fromMap(tableOptions));
+ decodingFormat.applyReadableMetadata(Stream.of("create-time").collect(Collectors.toList()));
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema = decodingFormat
+ .createRuntimeDecoder(ScanRuntimeProviderContext.INSTANCE, schema.toPhysicalRowDataType());
+
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+ for (byte[] bytes : input) {
+ inLongMsgDeserializationSchema.deserialize(bytes, out);
+ }
+
+ assertEquals(exceptedOutput, deData);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..dd4af0fb6
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.csv.CsvFormatFactory
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties
new file mode 100644
index 000000000..bf052fa68
--- /dev/null
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/resources/log4j-test.properties
@@ -0,0 +1,25 @@
+#
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+log4j.rootLogger=OFF, A1
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index ca2bab06a..9256b018b 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -142,7 +142,7 @@ public final class InLongMsgCsvFormatDeserializer extends AbstractInLongMsgForma
@Override
protected InLongMsgHead parseHead(String attr) {
- return InLongMsgCsvUtils.parseHead(attr);
+ return InLongMsgUtils.parseHead(attr);
}
@Override
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 3d2042264..45a4be324 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -100,7 +100,7 @@ public final class InLongMsgCsvMixedFormatDeserializer extends AbstractInLongMsg
@Override
protected InLongMsgHead parseHead(String attr) {
- return InLongMsgCsvUtils.parseHead(attr);
+ return InLongMsgUtils.parseHead(attr);
}
@Override
diff --git a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 84c32bb9a..befc1c702 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -18,14 +18,6 @@
package org.apache.inlong.sort.formats.inlongmsgcsv;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
-
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.util.Arrays;
@@ -37,7 +29,6 @@ import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.inlong.sort.formats.util.StringUtils;
import org.slf4j.Logger;
@@ -53,39 +44,6 @@ public class InLongMsgCsvUtils {
public static final String FORMAT_DELETE_HEAD_DELIMITER = "format.delete-head-delimiter";
public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
- public static InLongMsgHead parseHead(String attr) {
- Map<String, String> attributes = parseAttr(attr);
-
- // Extracts interface from the attributes.
- String streamId;
-
- if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
- streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
- } else {
- throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!");
- }
-
- // Extracts time from the attributes
- Timestamp time;
-
- if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
- String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
- time = parseDateTime(date);
- } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
- String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
- time = parseEpochTime(epoch);
- } else {
- throw new IllegalArgumentException(
- "Could not find " + INLONGMSG_ATTR_TIME_T
- + " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
- }
-
- // Extracts predefined fields from the attributes
- List<String> predefinedFields = getPredefinedFields(attributes);
-
- return new InLongMsgHead(attributes, streamId, time, predefinedFields);
- }
-
public static InLongMsgBody parseBody(
byte[] bytes,
String charset,
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 1318a6990..d9a7a4af7 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -64,16 +64,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-json</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-sdk</artifactId>
@@ -122,12 +112,8 @@
<version>1.4.01</version>
</dependency>
- <!--flink-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
- </dependency>
+ <!-- flink format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
@@ -143,6 +129,30 @@
<artifactId>flink-avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-inlongmsg-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- flink connector -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
@@ -158,7 +168,7 @@
</exclusions>
</dependency>
-
+ <!-- flink core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>