You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/24 12:33:18 UTC
[flink] branch release-1.9 updated:
[FLINK-13257][table-planner-blink] Avoid stream operator implementing
BoundedOneInput in blink runner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new a1f566d [FLINK-13257][table-planner-blink] Avoid stream operator implementing BoundedOneInput in blink runner
a1f566d is described below
commit a1f566d7541ed34d34f091b45165222cd983d999
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Jul 15 14:50:13 2019 +0800
[FLINK-13257][table-planner-blink] Avoid stream operator implementing BoundedOneInput in blink runner
This closes #9109
---
.../table/planner/codegen/CalcCodeGenerator.scala | 2 -
.../planner/codegen/CodeGeneratorContext.scala | 18 ----
.../planner/codegen/CorrelateCodeGenerator.scala | 7 +-
.../planner/codegen/ExpandCodeGenerator.scala | 2 -
.../planner/codegen/LongHashJoinGenerator.scala | 43 ++++----
.../codegen/NestedLoopJoinCodeGenerator.scala | 21 ++--
.../planner/codegen/OperatorCodeGenerator.scala | 109 ++++++++++++---------
.../table/planner/codegen/SinkCodeGenerator.scala | 5 +-
.../codegen/agg/batch/AggCodeGenHelper.scala | 3 +-
.../flink/table/planner/plan/utils/ScanUtil.scala | 2 -
.../runtime/operators/sort/StreamSortOperator.java | 41 ++++----
.../operators/sort/StreamSortOperatorTest.java | 2 -
12 files changed, 118 insertions(+), 137 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 7286cbe..2ab0223 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -63,9 +63,7 @@ object CalcCodeGenerator {
ctx,
opName,
processCode,
- "",
inputType,
- config,
inputTerm = inputTerm,
lazyInputUnboxingCode = true)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 4d9057d..7e67a12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -68,11 +68,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
private val reusableCloseStatements: mutable.LinkedHashSet[String] =
mutable.LinkedHashSet[String]()
- // set of endInput statements for StreamOperator that will be added only once
- // we use a LinkedHashSet to keep the insertion order
- private val reusableEndInputStatements: mutable.LinkedHashSet[String] =
- mutable.LinkedHashSet[String]()
-
// set of statements for cleanup dataview that will be added only once
// we use a LinkedHashSet to keep the insertion order
private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
@@ -249,14 +244,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * @return code block of statements that need to be placed in the endInput() method
- * (StreamOperator)
- */
- def reuseEndInputCode(): String = {
- reusableEndInputStatements.mkString("\n")
- }
-
- /**
* @return code block of statements that need to be placed in the cleanup() method of
* [AggregationsFunction]
*/
@@ -345,11 +332,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
def addReusableCloseStatement(s: String): Unit = reusableCloseStatements.add(s)
/**
- * Adds a reusable endInput statement
- */
- def addReusableEndInputStatement(s: String): Unit = reusableEndInputStatements.add(s)
-
- /**
* Adds a reusable cleanup statement
*/
def addReusableCleanupStatement(s: String): Unit = reusableCleanupStatements.add(s)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 7f8cc58..cf861bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -273,12 +273,7 @@ object CorrelateCodeGenerator {
}
val genOperator = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
- ctx,
- ruleDescription,
- body,
- "",
- inputType,
- config)
+ ctx, ruleDescription, body, inputType)
new CodeGenOperatorFactory(genOperator)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
index ef6ac80..2e4981c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpandCodeGenerator.scala
@@ -65,9 +65,7 @@ object ExpandCodeGenerator {
ctx,
opName,
processCode,
- "",
inputType,
- config,
inputTerm = inputTerm,
lazyInputUnboxingCode = false)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 7a57dad..d8f3089 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -323,11 +323,6 @@ object LongHashJoinGenerator {
|}
""".stripMargin,
s"""
- |LOG.info("Finish build phase.");
- |table.endBuild();
- |$buildEnd = true;
- """.stripMargin,
- s"""
|$BASE_ROW row = ($BASE_ROW) element.getValue();
|$nullCheckProbeCode
|if (!$nullCheckProbeTerm) {
@@ -337,22 +332,30 @@ object LongHashJoinGenerator {
|}
|$nullOuterJoin
""".stripMargin,
- s"""
- |LOG.info("Finish probe phase.");
- |while (this.table.nextMatching()) {
- | joinWithNextKey();
- |}
- |LOG.info("Finish rebuild phase.");
- """.stripMargin,
- s"""
- |if ($buildEnd) {
- | return $INPUT_SELECTION.SECOND;
- |} else {
- | return $INPUT_SELECTION.FIRST;
- |}
- """.stripMargin,
buildType,
- probeType)
+ probeType,
+ nextSelectionCode = Some(
+ s"""
+ |if ($buildEnd) {
+ | return $INPUT_SELECTION.SECOND;
+ |} else {
+ | return $INPUT_SELECTION.FIRST;
+ |}
+ """.stripMargin),
+ endInputCode1 = Some(
+ s"""
+ |LOG.info("Finish build phase.");
+ |table.endBuild();
+ |$buildEnd = true;
+ """.stripMargin),
+ endInputCode2 = Some(
+ s"""
+ |LOG.info("Finish probe phase.");
+ |while (this.table.nextMatching()) {
+ | joinWithNextKey();
+ |}
+ |LOG.info("Finish rebuild phase.");
+ """.stripMargin))
new CodeGenOperatorFactory[BaseRow](genOp)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
index b307134..933ab6c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/NestedLoopJoinCodeGenerator.scala
@@ -133,18 +133,19 @@ class NestedLoopJoinCodeGenerator(
ctx,
"BatchNestedLoopJoin",
processCode1,
- endInputCode1,
processCode2,
- endInputCode2,
- s"""
- |if ($buildEnd) {
- | return $INPUT_SELECTION.${if (leftIsBuild) "SECOND" else "FIRST"};
- |} else {
- | return $INPUT_SELECTION.${if (leftIsBuild) "FIRST" else "SECOND"};
- |}
- """.stripMargin,
leftType,
- rightType)
+ rightType,
+ nextSelectionCode = Some(
+ s"""
+ |if ($buildEnd) {
+ | return $INPUT_SELECTION.${if (leftIsBuild) "SECOND" else "FIRST"};
+ |} else {
+ | return $INPUT_SELECTION.${if (leftIsBuild) "FIRST" else "SECOND"};
+ |}
+ """.stripMargin),
+ endInputCode1 = Some(endInputCode1),
+ endInputCode2 = Some(endInputCode2))
new CodeGenOperatorFactory[BaseRow](genOp)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
index df1b136..142c2fc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/OperatorCodeGenerator.scala
@@ -21,7 +21,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig
import org.apache.flink.streaming.api.operators.{BoundedMultiInput, BoundedOneInput, InputSelectable, InputSelection, OneInputStreamOperator, Output, StreamOperator, TwoInputStreamOperator}
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.runtime.tasks.StreamTask
-import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.Indenter.toISC
import org.apache.flink.table.planner.utils.Logging
@@ -47,10 +46,9 @@ object OperatorCodeGenerator extends Logging {
ctx: CodeGeneratorContext,
name: String,
processCode: String,
- endInputCode: String,
inputType: LogicalType,
- config: TableConfig,
inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
+ endInputCode: Option[String] = None,
lazyInputUnboxingCode: Boolean = false,
converter: String => String = a => a): GeneratedOperator[OneInputStreamOperator[IN, OUT]] = {
addReuseOutElement(ctx)
@@ -58,10 +56,23 @@ object OperatorCodeGenerator extends Logging {
val abstractBaseClass = ctx.getOperatorBaseClass
val baseClass = classOf[OneInputStreamOperator[IN, OUT]]
val inputTypeTerm = boxedTypeTermForType(inputType)
+
+ val (endInput, endInputImpl) = endInputCode match {
+ case None => ("", "")
+ case Some(code) =>
+ (s"""
+ |@Override
+ |public void endInput() throws Exception {
+ | ${ctx.reuseLocalVariableCode()}
+ | $code
+ |}
+ """.stripMargin, s", ${className[BoundedOneInput]}")
+ }
+
val operatorCode =
j"""
public class $operatorName extends ${abstractBaseClass.getCanonicalName}
- implements ${baseClass.getCanonicalName}, ${className[BoundedOneInput]} {
+ implements ${baseClass.getCanonicalName}$endInputImpl {
private final Object[] references;
${ctx.reuseMemberCode()}
@@ -91,20 +102,7 @@ object OperatorCodeGenerator extends Logging {
$processCode
}
- @Override
- public void endInput() throws Exception {
- ${
- if (endInputCode.nonEmpty) {
- s"""
- |${ctx.reuseLocalVariableCode()}
- |$endInputCode
- """.stripMargin
- } else {
- ""
- }
- }
- ${ctx.reuseEndInputCode()}
- }
+ $endInput
@Override
public void close() throws Exception {
@@ -124,14 +122,14 @@ object OperatorCodeGenerator extends Logging {
ctx: CodeGeneratorContext,
name: String,
processCode1: String,
- endInputCode1: String,
processCode2: String,
- endInputCode2: String,
- nextSelection: String,
input1Type: LogicalType,
input2Type: LogicalType,
input1Term: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
input2Term: String = CodeGenUtils.DEFAULT_INPUT2_TERM,
+ nextSelectionCode: Option[String] = None,
+ endInputCode1: Option[String] = None,
+ endInputCode2: Option[String] = None,
useTimeCollect: Boolean = false)
: GeneratedOperator[TwoInputStreamOperator[IN1, IN2, OUT]] = {
addReuseOutElement(ctx)
@@ -141,11 +139,51 @@ object OperatorCodeGenerator extends Logging {
val inputTypeTerm1 = boxedTypeTermForType(input1Type)
val inputTypeTerm2 = boxedTypeTermForType(input2Type)
+ val (nextSel, nextSelImpl) = nextSelectionCode match {
+ case None => ("", "")
+ case Some(code) =>
+ val end1 = endInputCode1.getOrElse("")
+ val end2 = endInputCode2.getOrElse("")
+ (s"""
+ |@Override
+ |public $INPUT_SELECTION nextSelection() {
+ | $code
+ |}
+ """.stripMargin, s", ${className[InputSelectable]}")
+ }
+
+ val (endInput, endInputImpl) = (endInputCode1, endInputCode2) match {
+ case (None, None) => ("", "")
+ case (_, _) =>
+ val end1 = endInputCode1.getOrElse("")
+ val end2 = endInputCode2.getOrElse("")
+ (s"""
+ |private void endInput1() throws Exception {
+ | $end1
+ |}
+ |
+ |private void endInput2() throws Exception {
+ | $end2
+ |}
+ |
+ |@Override
+ |public void endInput(int inputId) throws Exception {
+ | switch (inputId) {
+ | case 1:
+ | endInput1();
+ | break;
+ | case 2:
+ | endInput2();
+ | break;
+ | }
+ |}
+ """.stripMargin, s", ${className[BoundedMultiInput]}")
+ }
+
val operatorCode =
j"""
public class $operatorName extends ${abstractBaseClass.getCanonicalName}
- implements ${baseClass.getCanonicalName},
- ${className[BoundedMultiInput]}, ${className[InputSelectable]} {
+ implements ${baseClass.getCanonicalName}$nextSelImpl$endInputImpl {
public static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger("$operatorName");
@@ -176,10 +214,6 @@ object OperatorCodeGenerator extends Logging {
$processCode1
}
- private void endInput1() throws Exception {
- $endInputCode1
- }
-
@Override
public void processElement2($STREAM_RECORD $ELEMENT)
throws Exception {
@@ -188,26 +222,9 @@ object OperatorCodeGenerator extends Logging {
$processCode2
}
- private void endInput2() throws Exception {
- $endInputCode2
- }
-
- @Override
- public void endInput(int inputId) throws Exception {
- switch (inputId) {
- case 1:
- endInput1();
- break;
- case 2:
- endInput2();
- break;
- }
- }
+ $nextSel
- @Override
- public $INPUT_SELECTION nextSelection() {
- $nextSelection
- }
+ $endInput
@Override
public void close() throws Exception {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index b7890c3..51d3483 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -175,7 +175,6 @@ object SinkCodeGenerator {
""".stripMargin
}
- val endInputCode = ""
val generated = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, OUT](
ctx,
operatorName,
@@ -183,9 +182,7 @@ object SinkCodeGenerator {
|$fieldIndexProcessCode
|$retractProcessCode
|""".stripMargin,
- endInputCode,
- fromTypeInfoToLogicalType(inputTypeInfo),
- config)
+ fromTypeInfoToLogicalType(inputTypeInfo))
(new CodeGenOperatorFactory[OUT](generated), outputTypeInfo.asInstanceOf[TypeInformation[OUT]])
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index 8b27c58..91a0ba0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -709,9 +709,8 @@ object AggCodeGenHelper {
ctx,
name,
processCode,
- endInputCode,
inputType,
- ctx.tableConfig,
+ endInputCode = Some(endInputCode),
lazyInputUnboxingCode = true)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 4ec8130..2beee54 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -109,9 +109,7 @@ object ScanUtil {
ctx,
convertName,
processCode,
- "",
outputRowType,
- config,
converter = inputTermConverter)
val substituteStreamOperator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
index b044022..2da7e6c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
@@ -45,13 +44,12 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
* Operator for stream sort.
*/
public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
- OneInputStreamOperator<BaseRow, BaseRow>, BoundedOneInput {
+ OneInputStreamOperator<BaseRow, BaseRow> {
private static final long serialVersionUID = 9042068324817807379L;
@@ -113,24 +111,6 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
}
@Override
- public void endInput() throws Exception {
- if (!inputBuffer.isEmpty()) {
- List<BaseRow> rowsSet = new ArrayList<>();
- inputBuffer.keySet().forEach(rowsSet::add);
- // sort the rows
- rowsSet.sort(comparator);
-
- // Emit the rows in order
- rowsSet.forEach((BaseRow row) -> {
- long count = inputBuffer.get(row);
- for (int i = 1; i <= count; i++) {
- collector.collect(row);
- }
- });
- }
- }
-
- @Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
TupleTypeInfo<Tuple2<BaseRow, Long>> tupleType = new TupleTypeInfo<>(inputRowType, Types.LONG);
@@ -145,8 +125,7 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
bufferState.clear();
List<Tuple2<BaseRow, Long>> dataToFlush = new ArrayList<>(inputBuffer.size());
- inputBuffer.entrySet().forEach(
- (Map.Entry<BaseRow, Long> entry) -> dataToFlush.add(Tuple2.of(entry.getKey(), entry.getValue())));
+ inputBuffer.forEach((key, value) -> dataToFlush.add(Tuple2.of(key, value)));
// batch put
bufferState.addAll(dataToFlush);
@@ -155,6 +134,22 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
@Override
public void close() throws Exception {
LOG.info("Closing StreamSortOperator");
+
+ // BoundedOneInput can not coexistence with checkpoint, so we emit output in close.
+ if (!inputBuffer.isEmpty()) {
+ List<BaseRow> rowsSet = new ArrayList<>();
+ rowsSet.addAll(inputBuffer.keySet());
+ // sort the rows
+ rowsSet.sort(comparator);
+
+ // Emit the rows in order
+ rowsSet.forEach((BaseRow row) -> {
+ long count = inputBuffer.get(row);
+ for (int i = 1; i <= count; i++) {
+ collector.collect(row);
+ }
+ });
+ }
super.close();
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
index c75af14..2900dc6 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java
@@ -76,7 +76,6 @@ public class StreamSortOperatorTest {
// do a snapshot, data could be recovered from state
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
- operator.endInput();
testHarness.close();
assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
@@ -88,7 +87,6 @@ public class StreamSortOperatorTest {
testHarness.open();
testHarness.processElement(record("abc", 1));
testHarness.processElement(record("aa", 1));
- operator.endInput();
testHarness.close();
expectedOutput.add(record("aa", 1));