You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/24 12:33:18 UTC

[flink] branch release-1.9 updated: [FLINK-13257][table-planner-blink] Avoid stream operator implementing BoundedOneInput in blink runner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new a1f566d  [FLINK-13257][table-planner-blink] Avoid stream operator implementing BoundedOneInput in blink runner
a1f566d is described below

commit a1f566d7541ed34d34f091b45165222cd983d999
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Jul 15 14:50:13 2019 +0800

    [FLINK-13257][table-planner-blink] Avoid stream operator implementing BoundedOneInput in blink runner
    
    This closes #9109
---
 .../table/planner/codegen/CalcCodeGenerator.scala  |   2 -
 .../planner/codegen/CodeGeneratorContext.scala     |  18 ----
 .../planner/codegen/CorrelateCodeGenerator.scala   |   7 +-
 .../planner/codegen/ExpandCodeGenerator.scala      |   2 -
 .../planner/codegen/LongHashJoinGenerator.scala    |  43 ++++----
 .../codegen/NestedLoopJoinCodeGenerator.scala      |  21 ++--
 .../planner/codegen/OperatorCodeGenerator.scala    | 109 ++++++++++++---------
 .../table/planner/codegen/SinkCodeGenerator.scala  |   5 +-
 .../codegen/agg/batch/AggCodeGenHelper.scala       |   3 +-
 .../flink/table/planner/plan/utils/ScanUtil.scala  |   2 -
 .../runtime/operators/sort/StreamSortOperator.java |  41 ++++----
 .../operators/sort/StreamSortOperatorTest.java     |   2 -
 12 files changed, 118 insertions(+), 137 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 7286cbe..2ab0223 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -63,9 +63,7 @@ object CalcCodeGenerator {
         ctx,
         opName,
         processCode,
-        "",
         inputType,
-        config,
         inputTerm = inputTerm,
         lazyInputUnboxingCode = true)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 4d9057d..7e67a12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -68,11 +68,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   private val reusableCloseStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
-  // set of endInput statements for StreamOperator that will be added only once
-  // we use a LinkedHashSet to keep the insertion order
-  private val reusableEndInputStatements: mutable.LinkedHashSet[String] =
-    mutable.LinkedHashSet[String]()
-
   // set of statements for cleanup dataview that will be added only once
   // we use a LinkedHashSet to keep the insertion order
   private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
@@ -249,14 +244,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
-    * @return code block of statements that need to be placed in the endInput() method
-    *         (StreamOperator)
-    */
-  def reuseEndInputCode(): String = {
-    reusableEndInputStatements.mkString("\n")
-  }
-
-  /**
     * @return code block of statements that need to be placed in the cleanup() method of
     *         [AggregationsFunction]
     */
@@ -345,11 +332,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   def addReusableCloseStatement(s: String): Unit = reusableCloseStatements.add(s)
 
   /**
-    * Adds a reusable endInput statement
-    */
-  def addReusableEndInputStatement(s: String): Unit = reusableEndInputStatements.add(s)
-
-  /**
     * Adds a reusable cleanup statement
     */
   def addReusableCleanupStatement(s: String): Unit = reusableCleanupStatements.add(s)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 7f8cc58..cf861bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -273,12 +273,7 @@ object CorrelateCodeGenerator {
     }
 
     val genOperator = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
-      ctx,
-      ruleDescription,
-      body,
-      "",
-      inputType,
-      config)
+      ctx, ruleDescription, body, inputType)
     new CodeGenOperatorFactory(genOperator)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
index ef6ac80..2e4981c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
@@ -65,9 +65,7 @@ object ExpandCodeGenerator {
       ctx,
       opName,
       processCode,
-      "",
       inputType,
-      config,
       inputTerm = inputTerm,
       lazyInputUnboxingCode = false)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 7a57dad..d8f3089 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -323,11 +323,6 @@ object LongHashJoinGenerator {
          |}
        """.stripMargin,
       s"""
-         |LOG.info("Finish build phase.");
-         |table.endBuild();
-         |$buildEnd = true;
-       """.stripMargin,
-      s"""
          |$BASE_ROW row = ($BASE_ROW) element.getValue();
          |$nullCheckProbeCode
          |if (!$nullCheckProbeTerm) {
@@ -337,22 +332,30 @@ object LongHashJoinGenerator {
          |}
          |$nullOuterJoin
        """.stripMargin,
-      s"""
-         |LOG.info("Finish probe phase.");
-         |while (this.table.nextMatching()) {
-         |  joinWithNextKey();
-         |}
-         |LOG.info("Finish rebuild phase.");
-       """.stripMargin,
-      s"""
-         |if ($buildEnd) {
-         |  return $INPUT_SELECTION.SECOND;
-         |} else {
-         |  return $INPUT_SELECTION.FIRST;
-         |}
-       """.stripMargin,
       buildType,
-      probeType)
+      probeType,
+      nextSelectionCode = Some(
+        s"""
+           |if ($buildEnd) {
+           |  return $INPUT_SELECTION.SECOND;
+           |} else {
+           |  return $INPUT_SELECTION.FIRST;
+           |}
+         """.stripMargin),
+      endInputCode1 = Some(
+        s"""
+           |LOG.info("Finish build phase.");
+           |table.endBuild();
+           |$buildEnd = true;
+       """.stripMargin),
+      endInputCode2 = Some(
+        s"""
+           |LOG.info("Finish probe phase.");
+           |while (this.table.nextMatching()) {
+           |  joinWithNextKey();
+           |}
+           |LOG.info("Finish rebuild phase.");
+         """.stripMargin))
 
     new CodeGenOperatorFactory[BaseRow](genOp)
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
index b307134..933ab6c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
@@ -133,18 +133,19 @@ class NestedLoopJoinCodeGenerator(
       ctx,
       "BatchNestedLoopJoin",
       processCode1,
-      endInputCode1,
       processCode2,
-      endInputCode2,
-      s"""
-         |if ($buildEnd) {
-         |  return $INPUT_SELECTION.${if (leftIsBuild) "SECOND" else "FIRST"};
-         |} else {
-         |  return $INPUT_SELECTION.${if (leftIsBuild) "FIRST" else "SECOND"};
-         |}
-       """.stripMargin,
       leftType,
-      rightType)
+      rightType,
+      nextSelectionCode = Some(
+        s"""
+           |if ($buildEnd) {
+           |  return $INPUT_SELECTION.${if (leftIsBuild) "SECOND" else "FIRST"};
+           |} else {
+           |  return $INPUT_SELECTION.${if (leftIsBuild) "FIRST" else "SECOND"};
+           |}
+         """.stripMargin),
+      endInputCode1 = Some(endInputCode1),
+      endInputCode2 = Some(endInputCode2))
     new CodeGenOperatorFactory[BaseRow](genOp)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index df1b136..142c2fc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -21,7 +21,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig
 import org.apache.flink.streaming.api.operators.{BoundedMultiInput, BoundedOneInput, InputSelectable, InputSelection, OneInputStreamOperator, Output, StreamOperator, TwoInputStreamOperator}
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.runtime.tasks.StreamTask
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.codegen.Indenter.toISC
 import org.apache.flink.table.planner.utils.Logging
@@ -47,10 +46,9 @@ object OperatorCodeGenerator extends Logging {
       ctx: CodeGeneratorContext,
       name: String,
       processCode: String,
-      endInputCode: String,
       inputType: LogicalType,
-      config: TableConfig,
       inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
+      endInputCode: Option[String] = None,
       lazyInputUnboxingCode: Boolean = false,
       converter: String => String = a => a): GeneratedOperator[OneInputStreamOperator[IN, OUT]] = {
     addReuseOutElement(ctx)
@@ -58,10 +56,23 @@ object OperatorCodeGenerator extends Logging {
     val abstractBaseClass = ctx.getOperatorBaseClass
     val baseClass = classOf[OneInputStreamOperator[IN, OUT]]
     val inputTypeTerm = boxedTypeTermForType(inputType)
+
+    val (endInput, endInputImpl) = endInputCode match {
+      case None => ("", "")
+      case Some(code) =>
+        (s"""
+           |@Override
+           |public void endInput() throws Exception {
+           |  ${ctx.reuseLocalVariableCode()}
+           |  $code
+           |}
+         """.stripMargin, s", ${className[BoundedOneInput]}")
+    }
+
     val operatorCode =
       j"""
       public class $operatorName extends ${abstractBaseClass.getCanonicalName}
-          implements ${baseClass.getCanonicalName}, ${className[BoundedOneInput]} {
+          implements ${baseClass.getCanonicalName}$endInputImpl {
 
         private final Object[] references;
         ${ctx.reuseMemberCode()}
@@ -91,20 +102,7 @@ object OperatorCodeGenerator extends Logging {
           $processCode
         }
 
-        @Override
-        public void endInput() throws Exception {
-          ${
-            if (endInputCode.nonEmpty) {
-              s"""
-                 |${ctx.reuseLocalVariableCode()}
-                 |$endInputCode
-                 """.stripMargin
-            } else {
-              ""
-            }
-          }
-          ${ctx.reuseEndInputCode()}
-        }
+        $endInput
 
         @Override
         public void close() throws Exception {
@@ -124,14 +122,14 @@ object OperatorCodeGenerator extends Logging {
       ctx: CodeGeneratorContext,
       name: String,
       processCode1: String,
-      endInputCode1: String,
       processCode2: String,
-      endInputCode2: String,
-      nextSelection: String,
       input1Type: LogicalType,
       input2Type: LogicalType,
       input1Term: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
       input2Term: String = CodeGenUtils.DEFAULT_INPUT2_TERM,
+      nextSelectionCode: Option[String] = None,
+      endInputCode1: Option[String] = None,
+      endInputCode2: Option[String] = None,
       useTimeCollect: Boolean = false)
     : GeneratedOperator[TwoInputStreamOperator[IN1, IN2, OUT]] = {
     addReuseOutElement(ctx)
@@ -141,11 +139,51 @@ object OperatorCodeGenerator extends Logging {
     val inputTypeTerm1 = boxedTypeTermForType(input1Type)
     val inputTypeTerm2 = boxedTypeTermForType(input2Type)
 
+    val (nextSel, nextSelImpl) = nextSelectionCode match {
+      case None => ("", "")
+      case Some(code) =>
+        val end1 = endInputCode1.getOrElse("")
+        val end2 = endInputCode2.getOrElse("")
+        (s"""
+            |@Override
+            |public $INPUT_SELECTION nextSelection() {
+            |  $code
+            |}
+         """.stripMargin, s", ${className[InputSelectable]}")
+    }
+
+    val (endInput, endInputImpl) = (endInputCode1, endInputCode2) match {
+      case (None, None) => ("", "")
+      case (_, _) =>
+        val end1 = endInputCode1.getOrElse("")
+        val end2 = endInputCode2.getOrElse("")
+        (s"""
+           |private void endInput1() throws Exception {
+           |  $end1
+           |}
+           |
+           |private void endInput2() throws Exception {
+           |  $end2
+           |}
+           |
+           |@Override
+           |public void endInput(int inputId) throws Exception {
+           |  switch (inputId) {
+           |    case 1:
+           |      endInput1();
+           |      break;
+           |    case 2:
+           |      endInput2();
+           |      break;
+           |  }
+           |}
+         """.stripMargin, s", ${className[BoundedMultiInput]}")
+    }
+
     val operatorCode =
       j"""
       public class $operatorName extends ${abstractBaseClass.getCanonicalName}
-          implements ${baseClass.getCanonicalName},
-           ${className[BoundedMultiInput]}, ${className[InputSelectable]} {
+          implements ${baseClass.getCanonicalName}$nextSelImpl$endInputImpl {
 
         public static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger("$operatorName");
 
@@ -176,10 +214,6 @@ object OperatorCodeGenerator extends Logging {
           $processCode1
         }
 
-        private void endInput1() throws Exception {
-          $endInputCode1
-        }
-
         @Override
         public void processElement2($STREAM_RECORD $ELEMENT)
          throws Exception {
@@ -188,26 +222,9 @@ object OperatorCodeGenerator extends Logging {
           $processCode2
         }
 
-        private void endInput2() throws Exception {
-          $endInputCode2
-        }
-
-        @Override
-        public void endInput(int inputId) throws Exception {
-          switch (inputId) {
-            case 1:
-              endInput1();
-              break;
-            case 2:
-              endInput2();
-              break;
-          }
-        }
+        $nextSel
 
-        @Override
-        public $INPUT_SELECTION nextSelection() {
-          $nextSelection
-        }
+        $endInput
 
         @Override
         public void close() throws Exception {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index b7890c3..51d3483 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -175,7 +175,6 @@ object SinkCodeGenerator {
           """.stripMargin
     }
 
-    val endInputCode = ""
     val generated = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, OUT](
       ctx,
       operatorName,
@@ -183,9 +182,7 @@ object SinkCodeGenerator {
          |$fieldIndexProcessCode
          |$retractProcessCode
          |""".stripMargin,
-      endInputCode,
-      fromTypeInfoToLogicalType(inputTypeInfo),
-      config)
+      fromTypeInfoToLogicalType(inputTypeInfo))
     (new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]])
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index 8b27c58..91a0ba0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -709,9 +709,8 @@ object AggCodeGenHelper {
       ctx,
       name,
       processCode,
-      endInputCode,
       inputType,
-      ctx.tableConfig,
+      endInputCode = Some(endInputCode),
       lazyInputUnboxingCode = true)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 4ec8130..2beee54 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -109,9 +109,7 @@ object ScanUtil {
       ctx,
       convertName,
       processCode,
-      "",
       outputRowType,
-      config,
       converter = inputTermConverter)
 
     val substituteStreamOperator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
index b044022..2da7e6c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.dataformat.BaseRow;
@@ -45,13 +44,12 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Operator for stream sort.
  */
 public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
-		OneInputStreamOperator<BaseRow, BaseRow>, BoundedOneInput {
+		OneInputStreamOperator<BaseRow, BaseRow> {
 
 	private static final long serialVersionUID = 9042068324817807379L;
 
@@ -113,24 +111,6 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
 	}
 
 	@Override
-	public void endInput() throws Exception {
-		if (!inputBuffer.isEmpty()) {
-			List<BaseRow> rowsSet = new ArrayList<>();
-			inputBuffer.keySet().forEach(rowsSet::add);
-			// sort the rows
-			rowsSet.sort(comparator);
-
-			// Emit the rows in order
-			rowsSet.forEach((BaseRow row) -> {
-				long count = inputBuffer.get(row);
-				for (int i = 1; i <= count; i++) {
-					collector.collect(row);
-				}
-			});
-		}
-	}
-
-	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 		TupleTypeInfo<Tuple2<BaseRow, Long>> tupleType = new TupleTypeInfo<>(inputRowType, Types.LONG);
@@ -145,8 +125,7 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
 		bufferState.clear();
 
 		List<Tuple2<BaseRow, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
-		inputBuffer.entrySet().forEach(
-				(Map.Entry<BaseRow, Long> entry) -> dataToFlush.add(Tuple2.of(entry.getKey(), entry.getValue())));
+		inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));
 
 		// batch put
 		bufferState.addAll(dataToFlush);
@@ -155,6 +134,22 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
 	@Override
 	public void close() throws Exception {
 		LOG.info("Closing StreamSortOperator");
+
+		// BoundedOneInput can not coexistence with checkpoint, so we emit output in close.
+		if (!inputBuffer.isEmpty()) {
+			List<BaseRow> rowsSet = new ArrayList<>();
+			rowsSet.addAll(inputBuffer.keySet());
+			// sort the rows
+			rowsSet.sort(comparator);
+
+			// Emit the rows in order
+			rowsSet.forEach((BaseRow row) -> {
+				long count = inputBuffer.get(row);
+				for (int i = 1; i <= count; i++) {
+					collector.collect(row);
+				}
+			});
+		}
 		super.close();
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
index c75af14..2900dc6 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
@@ -76,7 +76,6 @@ public class StreamSortOperatorTest {
 
 		// do a snapshot, data could be recovered from state
 		OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
-		operator.endInput();
 		testHarness.close();
 		assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
 
@@ -88,7 +87,6 @@ public class StreamSortOperatorTest {
 		testHarness.open();
 		testHarness.processElement(record("abc", 1));
 		testHarness.processElement(record("aa", 1));
-		operator.endInput();
 		testHarness.close();
 
 		expectedOutput.add(record("aa", 1));