You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/03 03:49:17 UTC

[GitHub] [flink] docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner

docete commented on a change in pull request #11276: [FLINK-16029][table-planner-blink] Remove register source and sink in test cases of blink planner
URL: https://github.com/apache/flink/pull/11276#discussion_r402721904
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
 ##########
 @@ -73,121 +79,72 @@ class InMemoryLookupableTableSource(
     map.toMap
   }
 
+  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    null
+  }
+
   override def isAsyncEnabled: Boolean = asyncEnabled
 
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+  override def getProducedDataType: DataType = schema.toRowDataType
+
+  override def getTableSchema: TableSchema = schema
 
-  override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+
+  override def isBounded: Boolean = bounded
 
   @VisibleForTesting
   def getResourceCounter: Int = resourceCounter.get()
-
 }
 
-object InMemoryLookupableTableSource {
-
-  /**
-    * Return a new builder that builds a [[InMemoryLookupableTableSource]].
-    *
-    * For example:
-    *
-    * {{{
-    *     val data = (
-    *       (11, 1L, "Julian"),
-    *       (22, 2L, "Jark"),
-    *       (33, 3L, "Fabian"))
-    *
-    *     val source = InMemoryLookupableTableSource.builder()
-    *       .data(data)
-    *       .field("age", Types.INT)
-    *       .field("id", Types.LONG)
-    *       .field("name", Types.STRING)
-    *       .enableAsync()
-    *       .build()
-    * }}}
-    *
-    * @return a new builder to build a [[InMemoryLookupableTableSource]]
-    */
-  def builder(): Builder = new Builder
+class InMemoryLookupableTableFactory extends TableSourceFactory[Row] {
+  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+    val dp = new DescriptorProperties
+    dp.putProperties(properties)
+    val tableSchema = dp.getTableSchema(SCHEMA)
 
+    val serializedData = dp.getString("data")
+    val data = EncodingUtils.decodeStringToObject(serializedData, classOf[List[Product]])
 
-  /**
-    * A builder for creating [[InMemoryLookupableTableSource]] instances.
-    *
-    * For example:
-    *
-    * {{{
-    *     val data = (
-    *       (11, 1L, "Julian"),
-    *       (22, 2L, "Jark"),
-    *       (33, 3L, "Fabian"))
-    *
-    *     val source = InMemoryLookupableTableSource.builder()
-    *       .data(data)
-    *       .field("age", Types.INT)
-    *       .field("id", Types.LONG)
-    *       .field("name", Types.STRING)
-    *       .enableAsync()
-    *       .build()
-    * }}}
-    */
-  class Builder {
-    private val schema = new mutable.LinkedHashMap[String, TypeInformation[_]]()
-    private var data: List[Product] = _
-    private var asyncEnabled: Boolean = false
-
-    /**
-      * Sets table data for the table source.
-      */
-    def data(data: List[Product]): Builder = {
-      this.data = data
-      this
+    val rowData = data.map { entry =>
+      Row.of((0 until entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
     }
 
-    /**
-      * Adds a field with the field name and the type information. Required.
-      * This method can be called multiple times. The call order of this method defines
-      * also the order of the fields in a row.
-      *
-      * @param fieldName the field name
-      * @param fieldType the type information of the field
-      */
-    def field(fieldName: String, fieldType: TypeInformation[_]): Builder = {
-      if (schema.contains(fieldName)) {
-        throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
-      }
-      schema += (fieldName -> fieldType)
-      this
-    }
+    val asyncEnabled = dp.getOptionalBoolean("is-async").orElse(false)
 
-    /**
-      * Enables async lookup for the table source
-      */
-    def enableAsync(): Builder = {
-      asyncEnabled = true
-      this
-    }
+    val bounded = dp.getOptionalBoolean("is-bounded").orElse(false)
 
-    /**
-      * Apply the current values and constructs a newly-created [[InMemoryLookupableTableSource]].
-      *
-      * @return a newly-created [[InMemoryLookupableTableSource]].
-      */
-    def build(): InMemoryLookupableTableSource = {
-      val fieldNames = schema.keys.toArray
-      val fieldTypes = schema.values.toArray
-      Preconditions.checkNotNull(data)
-      // convert
-      val rowData = data.map { entry =>
-        Row.of((0 until entry.productArity).map(entry.productElement(_).asInstanceOf[Object]): _*)
-      }
-      new InMemoryLookupableTableSource(
-        fieldNames,
-        fieldTypes,
-        rowData,
-        asyncEnabled
-      )
-    }
+    new InMemoryLookupableTableSource(tableSchema, rowData, asyncEnabled, bounded)
+  }
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, "InMemoryLookupableTable")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val supported = new util.ArrayList[String]()
+    supported.add("*")
+    supported
+  }
+}
+
+object InMemoryLookupableTableSource {
+
+  def createTemporaryTable(
 
 Review comment:
   Using `createTemporaryTable` doesn't mean that it will return something. See `TableEnvironment.createTemporaryView` or `ConnectorTableDescriptor.createTemporaryTable`. I think we should use an unify naming.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services