You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/30 15:10:22 UTC

[GitHub] dawidwys closed pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

dawidwys closed pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md
index 3cd1ed09cc9..e61d384075e 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -838,5 +838,4 @@ Unsupported features include:
 * Physical offsets - `PREV/NEXT`, which indexes all events seen rather than only those that were mapped to a pattern variable(as in [logical offsets](#logical-offsets) case).
 * Extracting time attributes - there is currently no possibility to get a time attribute for subsequent time-based operations.
 * Aggregates - one cannot use aggregates in `MEASURES` nor `DEFINE` clauses.
-* User defined functions cannot be used within `MATCH_RECOGNIZE`.
 * `MATCH_RECOGNIZE` is supported only for SQL. There is no equivalent in the Table API.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
index ff6e610845c..ed93f8e52cd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
@@ -103,6 +103,11 @@ public ClassLoader getUserCodeClassLoader() {
 		return runtimeContext.getUserCodeClassLoader();
 	}
 
+	@Override
+	public DistributedCache getDistributedCache() {
+		return runtimeContext.getDistributedCache();
+	}
+
 	// -----------------------------------------------------------------------------------
 	// Unsupported operations
 	// -----------------------------------------------------------------------------------
@@ -159,11 +164,6 @@ public boolean hasBroadcastVariable(String name) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
-	@Override
-	public DistributedCache getDistributedCache() {
-		throw new UnsupportedOperationException("Distributed cache is not supported.");
-	}
-
 	@Override
 	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
index 6bc4081da54..ef7ee89881e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -134,6 +135,7 @@ private void verifyRuntimeContext(final RichFunction function) {
 		final String taskNameWithSubtask = "barfoo";
 		final ExecutionConfig executionConfig = mock(ExecutionConfig.class);
 		final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+		final DistributedCache distributedCache = mock(DistributedCache.class);
 
 		RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class);
 
@@ -145,6 +147,7 @@ private void verifyRuntimeContext(final RichFunction function) {
 		when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
 		when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+		when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache);
 
 		function.setRuntimeContext(mockedRuntimeContext);
 
@@ -159,13 +162,7 @@ private void verifyRuntimeContext(final RichFunction function) {
 		assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks());
 		assertEquals(executionConfig, runtimeContext.getExecutionConfig());
 		assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader());
-
-		try {
-			runtimeContext.getDistributedCache();
-			fail("Expected getDistributedCached to fail with unsupported operation exception.");
-		} catch (UnsupportedOperationException e) {
-			// expected
-		}
+		assertEquals(distributedCache, runtimeContext.getDistributedCache());
 
 		try {
 			runtimeContext.getState(new ValueStateDescriptor<>("foobar", Integer.class, 42));
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
index 791d3883f80..3305a510753 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -25,8 +25,9 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.cep.pattern.conditions.IterativeCondition
-import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.cep.pattern.conditions.{IterativeCondition, RichIterativeCondition}
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, RichPatternFlatSelectFunction, RichPatternSelectFunction}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
@@ -110,9 +111,6 @@ class MatchCodeGenerator(
     * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
     * compiler.
     *
-    * This is a separate method from [[FunctionCodeGenerator.generateFunction()]] because as of
-    * now functions in CEP library do not support rich interfaces
-    *
     * @param name Class name of the Function. Must not be unique but has to be a valid Java class
     *             identifier.
     * @param clazz Flink Function to be generated.
@@ -131,47 +129,37 @@ class MatchCodeGenerator(
     : GeneratedFunction[F, T] = {
     val funcName = newName(name)
     val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
-    val (functionClass, signature, inputStatements, isInterface) =
-      if (clazz == classOf[IterativeCondition[_]]) {
-        val baseClass = classOf[IterativeCondition[_]]
+    val (functionClass, signature, inputStatements) =
+      if (clazz == classOf[RichIterativeCondition[_]]) {
+        val baseClass = classOf[RichIterativeCondition[_]]
         val inputTypeTerm = boxedTypeTermForTypeInfo(input)
         val contextType = classOf[IterativeCondition.Context[_]].getCanonicalName
 
         (baseClass,
           s"boolean filter(Object _in1, $contextType $contextTerm)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
-          false)
-      } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
-        val baseClass = classOf[PatternSelectFunction[_, _]]
+          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+      } else if (clazz == classOf[RichPatternSelectFunction[_, _]]) {
+        val baseClass = classOf[RichPatternSelectFunction[_, _]]
         val inputTypeTerm =
           s"java.util.Map<String, java.util.List<${boxedTypeTermForTypeInfo(input)}>>"
 
         (baseClass,
           s"Object select($inputTypeTerm $input1Term)",
-          List(),
-          true)
-      } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
-        val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+          List())
+      } else if (clazz == classOf[RichPatternFlatSelectFunction[_, _]]) {
+        val baseClass = classOf[RichPatternFlatSelectFunction[_, _]]
         val inputTypeTerm =
           s"java.util.Map<String, java.util.List<${boxedTypeTermForTypeInfo(input)}>>"
 
         (baseClass,
           s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm $collectorTerm)",
-          List(),
-          true)
+          List())
       } else {
         throw new CodeGenException("Unsupported Function.")
       }
 
-    if (!reuseOpenCode().trim.isEmpty || !reuseCloseCode().trim.isEmpty) {
-      throw new TableException(
-        "Match recognize does not support UDFs, nor other functions that require " +
-          "open/close methods yet.")
-    }
-
-    val extendsKeyword = if (isInterface) "implements" else "extends"
     val funcCode = j"""
-      |public class $funcName $extendsKeyword ${functionClass.getCanonicalName} {
+      |public class $funcName extends ${functionClass.getCanonicalName} {
       |
       |  ${reuseMemberCode()}
       |
@@ -180,6 +168,11 @@ class MatchCodeGenerator(
       |  }
       |
       |  @Override
+      |  public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
+      |    ${reuseOpenCode()}
+      |  }
+      |
+      |  @Override
       |  public $signature throws Exception {
       |    ${inputStatements.mkString("\n")}
       |    ${reusePatternLists()}
@@ -187,6 +180,11 @@ class MatchCodeGenerator(
       |    ${reusePerRecordCode()}
       |    $bodyCode
       |  }
+      |
+      |  @Override
+      |  public void close() throws Exception {
+      |    ${reuseCloseCode()}
+      |  }
       |}
     """.stripMargin
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
index 15b9d374889..23907c35d18 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.runtime.`match`
 
-import java.io.{IOException, ObjectInputStream}
-
-import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.cep.pattern.conditions.{IterativeCondition, RichIterativeCondition}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
@@ -31,30 +31,26 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
     name: String,
     code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-    // We cannot get user's classloader currently, see FLINK-6938 for details
-    val clazz = compile(Thread.currentThread().getContextClassLoader, name, code)
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
     LOG.debug("Instantiating IterativeCondition.")
     function = clazz.newInstance()
-    // TODO add logic for opening and closing the function once it can be a RichFunction
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): Boolean = {
     function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-    if (function == null) {
-      init()
-    }
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
index be3ace34d2c..06bab956bc4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
@@ -23,8 +23,8 @@ import java.util
 import org.apache.calcite.rel.RelFieldCollation
 import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.cep.pattern.conditions.IterativeCondition
-import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, RichPatternSelectFunction}
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.MatchCodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
@@ -53,7 +53,7 @@ object MatchUtil {
 
     val genCondition = generator
       .generateMatchFunction("MatchRecognizeCondition",
-        classOf[IterativeCondition[Row]],
+        classOf[RichIterativeCondition[Row]],
         body,
         condition.resultType)
     new IterativeConditionRunner(genCondition.name, genCondition.code)
@@ -82,7 +82,7 @@ object MatchUtil {
 
     val genFunction = generator.generateMatchFunction(
       "MatchRecognizePatternSelectFunction",
-      classOf[PatternSelectFunction[Row, Row]],
+      classOf[RichPatternSelectFunction[Row, Row]],
       body,
       resultExpression.resultType)
     new PatternSelectFunctionRunner(genFunction.name, genFunction.code)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
index c92cd384012..0c040f27408 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
@@ -18,10 +18,11 @@
 
 package org.apache.flink.table.runtime.`match`
 
-import java.io.{IOException, ObjectInputStream}
 import java.util
 
-import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.cep.{RichPatternFlatSelectFunction, RichPatternSelectFunction}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.types.CRow
@@ -35,20 +36,25 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
     name: String,
     code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
+  with Compiler[RichPatternSelectFunction[Row, Row]]
   with Logging {
 
   @transient private var outCRow: CRow = _
 
-  @transient private var function: PatternSelectFunction[Row, Row] = _
+  @transient private var function: RichPatternSelectFunction[Row, Row] = _
+
+  override def open(parameters: Configuration): Unit = {
+    if (outCRow == null) {
+      outCRow = new CRow(null, true)
+    }
 
-  def init(): Unit = {
     LOG.debug(s"Compiling PatternSelectFunction: $name \n\n Code:\n$code")
-    val clazz = compile(Thread.currentThread().getContextClassLoader, name, code)
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
     LOG.debug("Instantiating PatternSelectFunction.")
     function = clazz.newInstance()
-    // TODO add logic for opening and closing the function once it can be a RichFunction
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
   }
 
   override def flatSelect(
@@ -60,17 +66,8 @@ class PatternSelectFunctionRunner(
     out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-
-    if (outCRow == null) {
-      outCRow = new CRow(null, true)
-    }
-
-    if (function == null) {
-      init()
-    }
+  override def close(): Unit = {
+    FunctionUtils.closeFunction(function)
   }
 }
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
index 3917bdf8af9..e10a568b2eb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
@@ -202,54 +202,6 @@ class MatchOperatorValidationTest extends TableTestBase {
     streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
   }
 
-  @Test
-  def testUDFsAreNotSupportedInMeasures(): Unit = {
-    thrown.expectMessage(
-      "Match recognize does not support UDFs, nor other functions that require " +
-        "open/close methods yet.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    ToMillis(A.proctime) AS aProctime
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS A.symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testUDFsAreNotSupportedInDefine(): Unit = {
-    thrown.expectMessage(
-      "Match recognize does not support UDFs, nor other functions that require " +
-        "open/close methods yet.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS ToMillis(A.proctime) = 2
-         |) AS T
-         |""".stripMargin
-
-    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
   @Test
   def testAggregatesAreNotSupportedInMeasures(): Unit = {
     thrown.expectMessage(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
index 2245146e9df..8f5a8f335f4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -27,9 +27,9 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableConfig, TableEnvironment}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase, UserDefinedFunctionTestUtils}
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -543,6 +543,58 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase {
     // We do not assert the proctime in the result, cause it is currently
     // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime
   }
+
+  @Test
+  def testUserDefinedFunctions(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long)]
+    data.+=((1, "a", 1))
+    data.+=((2, "a", 1))
+    data.+=((3, "a", 1))
+    data.+=((4, "a", 1))
+    data.+=((5, "a", 1))
+    data.+=((6, "b", 1))
+    data.+=((7, "a", 1))
+    data.+=((8, "a", 1))
+    data.+=((9, "f", 1))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("prefix", new PrefixingScalarFunc)
+    val prefix = "PREF"
+    UserDefinedFunctionTestUtils
+      .setJobParameters(env, Map("prefix" -> prefix))
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as firstId,
+         |    prefix(A.name) as prefixedNameA,
+         |    LAST(id) as lastId
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (A+ C)
+         |  DEFINE
+         |    A AS prefix(A.name) = '$prefix:a'
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
@@ -550,3 +602,16 @@ class ToMillis extends ScalarFunction {
     t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class PrefixingScalarFunc extends ScalarFunction {
+
+  private var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+    prefix = context.getJobParameter("prefix", "")
+  }
+
+  def eval(value: String): String = {
+    s"$prefix:$value"
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services