You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:38 UTC

[flink] 05/11: [hotfix][table, tests] Reduce mockito usage in TableTestUtil

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81c96cac656d1dfc61d06aa85e18d4edf4eab59e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Aug 27 16:49:55 2018 +0200

    [hotfix][table,tests] Reduce mockito usage in TableTestUtil
---
 .../apache/flink/table/utils/TableTestBase.scala   | 50 +++++++++++-----------
 1 file changed, 25 insertions(+), 25 deletions(-)

diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 5e1aabe..b987e34 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -21,12 +21,13 @@ package org.apache.flink.table.utils
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JExecutionEnvironment}
+import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet, ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment => JStreamExecutionEnvironment}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{Table, TableEnvironment, TableSchema}
 import org.apache.flink.table.expressions.Expression
@@ -152,9 +153,9 @@ object TableTestUtil {
 }
 
 case class BatchTableTestUtil() extends TableTestUtil {
-  val javaEnv = mock(classOf[JExecutionEnvironment])
+  val javaEnv = new LocalEnvironment()
   val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-  val env = mock(classOf[ExecutionEnvironment])
+  val env = new ExecutionEnvironment(javaEnv)
   val tableEnv = TableEnvironment.getTableEnvironment(env)
 
   def addTable[T: TypeInformation](
@@ -232,11 +233,11 @@ case class BatchTableTestUtil() extends TableTestUtil {
 }
 
 case class StreamTableTestUtil() extends TableTestUtil {
-  val javaEnv = mock(classOf[JStreamExecutionEnvironment])
-  when(javaEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+  val javaEnv = new LocalStreamEnvironment()
+  javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
   val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-  val env = mock(classOf[StreamExecutionEnvironment])
-  when(env.getWrappedStreamExecutionEnvironment).thenReturn(javaEnv)
+  val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = TableEnvironment.getTableEnvironment(env)
 
   def addTable[T: TypeInformation](
@@ -244,25 +245,16 @@ case class StreamTableTestUtil() extends TableTestUtil {
       fields: Expression*)
     : Table = {
 
-    val ds = mock(classOf[DataStream[T]])
-    val jDs = mock(classOf[JDataStream[T]])
-    when(ds.javaStream).thenReturn(jDs)
-    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
-    when(jDs.getType).thenReturn(typeInfo)
-
-    val t = ds.toTable(tableEnv, fields: _*)
-    tableEnv.registerTable(name, t)
-    t
+    val table = env.fromElements().toTable(tableEnv, fields: _*)
+    tableEnv.registerTable(name, table)
+    table
   }
 
   def addJavaTable[T](typeInfo: TypeInformation[T], name: String, fields: String): Table = {
-
-    val jDs = mock(classOf[JDataStream[T]])
-    when(jDs.getType).thenReturn(typeInfo)
-
-    val t = javaTableEnv.fromDataStream(jDs, fields)
-    javaTableEnv.registerTable(name, t)
-    t
+    val stream = javaEnv.addSource(new EmptySource[T], typeInfo)
+    val table = javaTableEnv.fromDataStream(stream, fields)
+    javaTableEnv.registerTable(name, table)
+    table
   }
 
   def addFunction[T: TypeInformation](
@@ -327,3 +319,11 @@ case class StreamTableTestUtil() extends TableTestUtil {
     printTable(tableEnv.sqlQuery(query))
   }
 }
+
+class EmptySource[T]() extends SourceFunction[T] {
+  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
+  }
+
+  override def cancel(): Unit = {
+  }
+}