You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/27 09:37:52 UTC
[inlong] branch master updated: [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8fddb1d56 [INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)
8fddb1d56 is described below
commit 8fddb1d56d6d29cb59920b4d4657770cdcec8d85
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 17:37:47 2022 +0800
[INLONG-6296][Sort] Split one record to multiple records when the physical data has more records (#6298)
---
.../sort/base/format/JsonDynamicSchemaFormat.java | 2 +-
.../kafka/DynamicKafkaSerializationSchema.java | 124 ++++++++++++++++++++-
.../inlong/sort/kafka/FlinkKafkaProducer.java | 25 +++--
3 files changed, 136 insertions(+), 15 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 2c2da2f52..f49f01c0b 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -94,8 +94,8 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
.put(java.sql.Types.BOOLEAN, new BooleanType())
.put(java.sql.Types.OTHER, new VarCharType())
.build();
+ public final ObjectMapper objectMapper = new ObjectMapper();
protected final JsonToRowDataConverters rowDataConverters;
- private final ObjectMapper objectMapper = new ObjectMapper();
protected JsonDynamicSchemaFormat() {
this.rowDataConverters =
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index 4a0d5dc8b..68f965f6b 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -28,6 +29,7 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.kafka.KafkaDynamicSink.WritableMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
@@ -36,6 +38,13 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
/**
* A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSink}.
@@ -71,6 +80,8 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
*/
private final int[] metadataPositions;
private final String sinkMultipleFormat;
+ private boolean multipleSink;
+ private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
private int[] partitions;
private int parallelInstanceId;
@@ -126,6 +137,13 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
if (partitioner != null) {
partitioner.open(parallelInstanceId, numParallelInstances);
}
+ // Only support dynamic topic when the topicPattern is specified
+ // and the valueSerialization is RawFormatSerializationSchema
+ if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+ multipleSink = true;
+ jsonDynamicSchemaFormat =
+ (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ }
}
@Override
@@ -162,7 +180,6 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
} else {
valueSerialized = valueSerialization.serialize(valueRow);
}
-
return new ProducerRecord<>(
getTargetTopic(consumedRow),
extractPartition(consumedRow, keySerialized, valueSerialized),
@@ -172,6 +189,104 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS));
}
+ /**
+ * Serialize for list it is used for multiple sink scenes when a record contains mulitple real records.
+ *
+ * @param consumedRow The consumeRow
+ * @param timestamp The timestamp
+ * @return List of ProducerRecord
+ */
+ public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData consumedRow, @Nullable Long timestamp) {
+ if (!multipleSink) {
+ return Collections.singletonList(serialize(consumedRow, timestamp));
+ }
+ List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
+ try {
+ JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
+ boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+ if (isDDL) {
+ values.add(new ProducerRecord<>(
+ jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+ extractPartition(consumedRow, null, consumedRow.getBinary(0)),
+ null,
+ consumedRow.getBinary(0)));
+ return values;
+ }
+ JsonNode updateBeforeNode = jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+ JsonNode updateAfterNode = jsonDynamicSchemaFormat.getUpdateAfter(rootNode);
+ if (!splitRequired(updateBeforeNode, updateAfterNode)) {
+ values.add(new ProducerRecord<>(
+ jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+ extractPartition(consumedRow, null, consumedRow.getBinary(0)),
+ null, consumedRow.getBinary(0)));
+ } else {
+ split2JsonArray(rootNode, updateBeforeNode, updateAfterNode, values);
+ }
+ } catch (IOException e) {
+ LOG.warn("deserialize error", e);
+ values.add(new ProducerRecord<>(topic, null, null, consumedRow.getBinary(0)));
+ }
+ return values;
+ }
+
+ private boolean splitRequired(JsonNode updateBeforeNode, JsonNode updateAfterNode) {
+ return (updateAfterNode != null && updateAfterNode.isArray()
+ && updateAfterNode.size() > 1) || (updateBeforeNode != null && updateBeforeNode.isArray()
+ && updateBeforeNode.size() > 1);
+ }
+
+ private void split2JsonArray(JsonNode rootNode,
+ JsonNode updateBeforeNode, JsonNode updateAfterNode, List<ProducerRecord<byte[], byte[]>> values) {
+ Iterator<Entry<String, JsonNode>> iterator = rootNode.fields();
+ Map<String, Object> baseMap = new LinkedHashMap<>();
+ String updateBeforeKey = null;
+ String updateAfterKey = null;
+ while (iterator.hasNext()) {
+ Entry<String, JsonNode> kv = iterator.next();
+ if (kv.getValue() == null || (!kv.getValue().equals(updateBeforeNode) && !kv.getValue()
+ .equals(updateAfterNode))) {
+ baseMap.put(kv.getKey(), kv.getValue());
+ continue;
+ }
+ if (kv.getValue().equals(updateAfterNode)) {
+ updateAfterKey = kv.getKey();
+ } else if (kv.getValue().equals(updateBeforeNode)) {
+ updateBeforeKey = kv.getKey();
+ }
+ }
+ if (updateAfterNode != null) {
+ for (int i = 0; i < updateAfterNode.size(); i++) {
+ baseMap.put(updateAfterKey, Collections.singletonList(updateAfterNode.get(i)));
+ if (updateBeforeNode != null && updateBeforeNode.size() > i) {
+ baseMap.put(updateBeforeKey, Collections.singletonList(updateBeforeNode.get(i)));
+ } else if (updateBeforeKey != null) {
+ baseMap.remove(updateBeforeKey);
+ }
+ try {
+ byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+ values.add(new ProducerRecord<>(
+ jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+ extractPartition(null, null, data), null, data));
+ } catch (Exception e) {
+ throw new RuntimeException("serialize for list error", e);
+ }
+ }
+ } else {
+ // In general, it will not run to this branch
+ for (int i = 0; i < updateBeforeNode.size(); i++) {
+ baseMap.put(updateBeforeKey, Collections.singletonList(updateBeforeNode.get(i)));
+ try {
+ byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+ values.add(new ProducerRecord<>(
+ jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+ extractPartition(null, null, data), null, data));
+ } catch (Exception e) {
+ throw new RuntimeException("serialize for list error", e);
+ }
+ }
+ }
+ }
+
@Override
public void setParallelInstanceId(int parallelInstanceId) {
this.parallelInstanceId = parallelInstanceId;
@@ -189,14 +304,11 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat
@Override
public String getTargetTopic(RowData element) {
- // Only support dynamic topic when the topicPattern is specified
- // and the valueSerialization is RawFormatSerializationSchema
- if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) {
+ if (multipleSink) {
try {
// Extract the index '0' as the raw data is determined by the Raw format:
// The Raw format allows to read and write raw (byte based) values as a single column
- return DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat)
- .parse(element.getBinary(0), topicPattern);
+ return jsonDynamicSchemaFormat.parse(element.getBinary(0), topicPattern);
} catch (IOException e) {
// Ignore the parse error and it will return the default topic final.
LOG.warn("parse dynamic topic error", e);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index d3ca629cb..f59ff93da 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -54,6 +54,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetric
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -94,7 +95,6 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -934,7 +934,8 @@ public class FlinkKafkaProducer<IN>
throws FlinkKafkaException {
checkErroneous();
- ProducerRecord<byte[], byte[]> record;
+ ProducerRecord<byte[], byte[]> record = null;
+ List<ProducerRecord<byte[], byte[]>> records = null;
if (keyedSchema != null) {
byte[] serializedKey = keyedSchema.serializeKey(next);
byte[] serializedValue = keyedSchema.serializeValue(next);
@@ -967,9 +968,7 @@ public class FlinkKafkaProducer<IN>
serializedKey,
serializedValue);
} else {
- record =
- new ProducerRecord<>(
- targetTopic, null, timestamp, serializedKey, serializedValue);
+ record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
} else if (kafkaSchema != null) {
if (kafkaSchema instanceof KafkaContextAware) {
@@ -986,18 +985,28 @@ public class FlinkKafkaProducer<IN>
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
-
contextAwareSchema.setPartitions(partitions);
}
- record = kafkaSchema.serialize(next, context.timestamp());
+ if (kafkaSchema instanceof DynamicKafkaSerializationSchema) {
+ records = ((DynamicKafkaSerializationSchema) kafkaSchema)
+ .serializeForList((RowData) next, context.timestamp());
+ } else {
+ record = kafkaSchema.serialize(next, context.timestamp());
+ }
} else {
throw new RuntimeException(
"We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
+ "is a bug.");
}
+ if (record != null) {
+ send(record, transaction);
+ } else if (records != null) {
+ records.forEach(r -> send(r, transaction));
+ }
+ }
+ private void send(ProducerRecord<byte[], byte[]> record, FlinkKafkaProducer.KafkaTransactionState transaction) {
sendOutMetrics(1L, (long) record.value().length);
-
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}