You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/07 06:14:31 UTC
[incubator-inlong] branch master updated: [INLONG-2953][Sort] Support orderly output (#2954)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd23cef [INLONG-2953][Sort] Support orderly output (#2954)
fd23cef is described below
commit fd23ceff554d09f6aa5fbac6792bebd40c794ad1
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Mon Mar 7 14:14:25 2022 +0800
[INLONG-2953][Sort] Support orderly output (#2954)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
.../inlong/sort/configuration/Constants.java | 5 +++
.../inlong/sort/singletenant/flink/Entrance.java | 37 ++++++++++++++++++----
2 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index cdc416c..c88b16d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -331,6 +331,11 @@ public class Constants {
.noDefaultValue()
.withDescription("The file which contains dataflow info for a single tenant job");
+ public static final ConfigOption<Boolean> JOB_ORDERLY_OUTPUT =
+ key("job.orderly.output")
+ .defaultValue(false)
+ .withDescription("Whether to ensure orderly output or not");
+
// ------------------------------------------------------------------------
// File format and compression related
// ------------------------------------------------------------------------
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 85b0820..9f680a8 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -109,16 +109,24 @@ public class Entrance {
Map<String, Object> properties) {
final String sourceType = checkNotNull(config.getString(Constants.SOURCE_TYPE));
final int sourceParallelism = config.getInteger(Constants.SOURCE_PARALLELISM);
+ final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
if (sourceType.equals(Constants.SOURCE_TYPE_PULSAR)) {
checkState(sourceInfo instanceof PulsarSourceInfo);
PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceInfo;
- return env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
- .uid(Constants.SOURCE_UID)
- .name("Pulsar source")
- .setParallelism(sourceParallelism)
- .rebalance();
+ DataStream<SerializedRecord> sourceStream =
+ env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
+ .uid(Constants.SOURCE_UID)
+ .name("Pulsar source")
+ .setParallelism(sourceParallelism);
+
+ if (orderlyOutput) {
+ return sourceStream.forward();
+ } else {
+ return sourceStream.rebalance();
+ }
+
} else {
throw new IllegalArgumentException("Unsupported source type " + sourceType);
}
@@ -129,6 +137,7 @@ public class Entrance {
SourceInfo sourceInfo,
Configuration config
) throws IOException, ClassNotFoundException {
+ final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
FieldInfo[] sourceFields = sourceInfo.getFields();
DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
sourceFields, sourceInfo.getDeserializationInfo());
@@ -138,22 +147,30 @@ public class Entrance {
schema,
fieldMappingTransformer,
!(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
- return sourceStream.process(function)
+
+ DataStream<Row> deserializedStream = sourceStream.process(function)
.uid(Constants.DESERIALIZATION_SCHEMA_UID)
.name("Deserialization")
.setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
+
+ if (orderlyOutput) {
+ return deserializedStream.forward();
+ }
+
+ return deserializedStream;
}
private static DataStream<Row> buildTransformationStream(
DataStream<Row> deserializationStream,
DataFlowInfo dataFlowInfo,
Configuration config) {
+ final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
TransformationInfo transformationInfo = dataFlowInfo.getTransformationInfo();
if (transformationInfo == null) {
return deserializationStream;
}
- return deserializationStream
+ DataStream<Row> transformationStream = deserializationStream
.process(new Transformer(
transformationInfo,
dataFlowInfo.getSourceInfo().getFields(),
@@ -161,6 +178,12 @@ public class Entrance {
.uid(Constants.TRANSFORMATION_UID)
.name("Transformation")
.setParallelism(config.getInteger(Constants.TRANSFORMATION_PARALLELISM));
+
+ if (orderlyOutput) {
+ return transformationStream.forward();
+ }
+
+ return transformationStream;
}
private static void buildSinkStream(