You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/07 06:14:31 UTC

[incubator-inlong] branch master updated: [INLONG-2953][Sort] Support orderly output (#2954)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd23cef  [INLONG-2953][Sort] Support orderly output (#2954)
fd23cef is described below

commit fd23ceff554d09f6aa5fbac6792bebd40c794ad1
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Mon Mar 7 14:14:25 2022 +0800

    [INLONG-2953][Sort] Support orderly output (#2954)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 .../inlong/sort/configuration/Constants.java       |  5 +++
 .../inlong/sort/singletenant/flink/Entrance.java   | 37 ++++++++++++++++++----
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index cdc416c..c88b16d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -331,6 +331,11 @@ public class Constants {
                     .noDefaultValue()
                     .withDescription("The file which contains dataflow info for a single tenant job");
 
+    public static final ConfigOption<Boolean> JOB_ORDERLY_OUTPUT =
+            key("job.orderly.output")
+                    .defaultValue(false)
+                    .withDescription("Whether to ensure orderly output or not");
+
     // ------------------------------------------------------------------------
     //  File format and compression related
     // ------------------------------------------------------------------------
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 85b0820..9f680a8 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -109,16 +109,24 @@ public class Entrance {
             Map<String, Object> properties) {
         final String sourceType = checkNotNull(config.getString(Constants.SOURCE_TYPE));
         final int sourceParallelism = config.getInteger(Constants.SOURCE_PARALLELISM);
+        final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
 
         if (sourceType.equals(Constants.SOURCE_TYPE_PULSAR)) {
             checkState(sourceInfo instanceof PulsarSourceInfo);
             PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceInfo;
 
-            return env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
-                    .uid(Constants.SOURCE_UID)
-                    .name("Pulsar source")
-                    .setParallelism(sourceParallelism)
-                    .rebalance();
+            DataStream<SerializedRecord> sourceStream =
+                    env.addSource(buildPulsarSource(pulsarSourceInfo, config, properties))
+                            .uid(Constants.SOURCE_UID)
+                            .name("Pulsar source")
+                            .setParallelism(sourceParallelism);
+
+            if (orderlyOutput) {
+                return sourceStream.forward();
+            } else {
+                return sourceStream.rebalance();
+            }
+
         } else {
             throw new IllegalArgumentException("Unsupported source type " + sourceType);
         }
@@ -129,6 +137,7 @@ public class Entrance {
             SourceInfo sourceInfo,
             Configuration config
     ) throws IOException, ClassNotFoundException {
+        final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
         FieldInfo[] sourceFields = sourceInfo.getFields();
         DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
                 sourceFields, sourceInfo.getDeserializationInfo());
@@ -138,22 +147,30 @@ public class Entrance {
                 schema,
                 fieldMappingTransformer,
                 !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
-        return sourceStream.process(function)
+
+        DataStream<Row> deserializedStream = sourceStream.process(function)
                 .uid(Constants.DESERIALIZATION_SCHEMA_UID)
                 .name("Deserialization")
                 .setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
+
+        if (orderlyOutput) {
+            return deserializedStream.forward();
+        }
+
+        return deserializedStream;
     }
 
     private static DataStream<Row> buildTransformationStream(
             DataStream<Row> deserializationStream,
             DataFlowInfo dataFlowInfo,
             Configuration config) {
+        final boolean orderlyOutput = config.getBoolean(Constants.JOB_ORDERLY_OUTPUT);
         TransformationInfo transformationInfo = dataFlowInfo.getTransformationInfo();
         if (transformationInfo == null) {
             return deserializationStream;
         }
 
-        return deserializationStream
+        DataStream<Row> transformationStream = deserializationStream
                 .process(new Transformer(
                         transformationInfo,
                         dataFlowInfo.getSourceInfo().getFields(),
@@ -161,6 +178,12 @@ public class Entrance {
                        .uid(Constants.TRANSFORMATION_UID)
                        .name("Transformation")
                        .setParallelism(config.getInteger(Constants.TRANSFORMATION_PARALLELISM));
+
+        if (orderlyOutput) {
+            return transformationStream.forward();
+        }
+
+        return transformationStream;
     }
 
     private static void buildSinkStream(