You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/03/22 16:26:09 UTC

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1827

    [FLINK-3639] add methods for registering datasets and tables in the TableEnvironment

    This PR add methods for registering DataSets and Tables in the TableEnvironment to allow executing SQL queries in them in the (near) future. Registration will fail if there already exists a table or dataset with the same name. However, we do allow registering the same dataset and/or table under different names.
    It also adds a `scan` method to retrieve a registered Table from the registry.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink registerTables

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1827.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1827
    
----
commit e72d133a41c0988c4074052e97bbc87590e80ddb
Author: vasia <va...@apache.org>
Date:   2016-03-21T14:22:58Z

    [FLINK-3639] add methods for registering datasets and tables in the TableEnvironment

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57073483
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.schema
    +
    +import java.lang.Double
    +import java.util
    +import java.util.Collections
    +
    +import org.apache.calcite.plan.RelOptTable
    +import org.apache.calcite.plan.RelOptTable.ToRelContext
    +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
    +import org.apache.calcite.schema.Schema.TableType
    +import org.apache.calcite.schema.impl.AbstractTable
    +import org.apache.calcite.schema.{TranslatableTable, Statistic}
    +import org.apache.calcite.util.ImmutableBitSet
    +
    +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
    +
    +  override def getStatistic: Statistic = new DefaultTableStatistic
    +
    +  override def getJdbcTableType: TableType = ???
    +
    +  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
    +
    +  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
    +    relNode
    +  }
    +}
    +
    +class DefaultTableStatistic extends Statistic {
    --- End diff --
    
    Yes, I think you can but don't have to override it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57066925
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.schema
    +
    +import java.lang.Double
    +import java.util
    +import java.util.Collections
    +
    +import org.apache.calcite.plan.RelOptTable
    +import org.apache.calcite.plan.RelOptTable.ToRelContext
    +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
    +import org.apache.calcite.schema.Schema.TableType
    +import org.apache.calcite.schema.impl.AbstractTable
    +import org.apache.calcite.schema.{TranslatableTable, Statistic}
    +import org.apache.calcite.util.ImmutableBitSet
    +
    +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
    +
    +  override def getStatistic: Statistic = new DefaultTableStatistic
    +
    +  override def getJdbcTableType: TableType = ???
    +
    +  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
    +
    +  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
    +    relNode
    +  }
    +}
    +
    +class DefaultTableStatistic extends Statistic {
    --- End diff --
    
    Do not provide statistics here. It might override valid statistics that Calcite computed from the relational expression that the Table represents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57064833
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -59,29 +64,55 @@ object TranslationContext {
           .traitDefs(ConventionTraitDef.INSTANCE)
           .build
     
    -    tabNames = Map[AbstractTable, String]()
    -
    +    tablesRegistry = Map[String, AbstractTable]()
         relBuilder = RelBuilder.create(frameworkConfig)
    -
         nameCntr.set(0)
     
       }
     
       def addDataSet(newTable: DataSetTable[_]): String = {
    +    val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
    +    tables.add(tabName, newTable)
    +    tabName
    +  }
    +
    +  @throws[TableException]
    +  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
    --- End diff --
    
    I think `addAndRegisterDataSet(DataSetTable, String)` and `registerTable(TableTable, String)` can be combined to `registerTable(AbstractTable, String)` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57173359
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table
    +
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.expressions.Expression
    +import org.apache.flink.api.table.plan.TranslationContext
    +import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
    +
    +class AbstractTableEnvironment {
    +
    +  private[flink] val config = new TableConfig()
    +
    +  /**
    +   * Returns the table config to define the runtime behavior of the Table API.
    +   */
    +  def getConfig = config
    +
    +  /**
    +   * Registers a Table under a unique name, so that it can be used in SQL queries.
    +   * @param name the Table name
    +   * @param table the Table to register
    +   */
    +  def registerTable[T](name: String, table: Table): Unit = {
    +    val tableTable = new TableTable(table.getRelNode())
    +    TranslationContext.registerTable(tableTable, name)
    +  }
    +
    +  /**
    +   * Retrieve a registered Table.
    +   * @param tableName the name under which the Table has been registered
    +   * @return the Table object
    +   */
    +  @throws[TableException]
    +  def scan(tableName: String): Table = {
    +    if (TranslationContext.isRegistered(tableName)) {
    +      val relBuilder = TranslationContext.getRelBuilder
    +      relBuilder.scan(tableName)
    +      new Table(relBuilder.build(), relBuilder)
    +    }
    +    else {
    +      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
    +    }
    +  }
    +
    +  def registerUniqueNameDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType)
    +    val dataSetTable = new DataSetTable[T](
    +      dataset,
    +      fieldIndexes,
    +      fieldNames
    +    )
    +    TranslationContext.registerTable(dataSetTable, name)
    +  }
    +
    +  def registerDataSetWithFields[T](
    --- End diff --
    
    call this `registerDataSetInternal` and make it private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57074776
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala ---
    @@ -72,5 +74,68 @@ class TableEnvironment {
          new ScalaBatchTranslator(config).translate[T](table.relNode)
       }
     
    -}
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are used to name the Table fields.
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    --- End diff --
    
    probably :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200769959
  
    Thanks. I've addressed your comments!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57065041
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -59,29 +64,55 @@ object TranslationContext {
           .traitDefs(ConventionTraitDef.INSTANCE)
           .build
     
    -    tabNames = Map[AbstractTable, String]()
    -
    +    tablesRegistry = Map[String, AbstractTable]()
         relBuilder = RelBuilder.create(frameworkConfig)
    -
         nameCntr.set(0)
     
       }
     
       def addDataSet(newTable: DataSetTable[_]): String = {
    --- End diff --
    
    Refactor to `registerTable(AbstractTable)` as well? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57065751
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---
    @@ -87,5 +90,72 @@ class TableEnvironment {
         new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
       }
     
    -}
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are used to name the Table fields.
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType)
    +    val dataSetTable = new DataSetTable[T](
    +      dataset,
    +      fieldIndexes,
    +      fieldNames
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are renamed to the given set of fields.
    +   *
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   * @param fields the Table field names
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = {
    +
    +    val exprs = ExpressionParser
    +      .parseExpressionList(fields)
    +      .toArray
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs)
     
    +    val dataSetTable = new DataSetTable[T](
    +      dataset,
    +      fieldIndexes.toArray,
    +      fieldNames.toArray
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a Table under a unique name, so that it can be used in SQL queries.
    +   * @param name the Table name
    +   * @param table the Table to register
    +   */
    +  def registerTable[T](name: String, table: Table): Unit = {
    +    val tableTable = new TableTable(table.getRelNode())
    +    TranslationContext.registerTable(tableTable, name)
    +  }
    +
    +  /**
    +   * Retrieve a registered Table.
    +   * @param tableName the name under which the Table has been registered
    +   * @return the Table object
    +   */
    +  @throws[TableException]
    +  def scan(tableName: String): Table = {
    +    if (TranslationContext.isRegistered(tableName)) {
    +      val relBuilder = TranslationContext.getRelBuilder
    +      relBuilder.scan(tableName)
    +      new Table(relBuilder.build(), relBuilder)
    +    }
    +    else {
    +      throw new TableException("Table \"" + tableName + "\" was not found in the registry.")
    --- End diff --
    
    Strings can be build with Scala like this `s"Table \"$tableName\" was not found in the registry."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200803230
  
    Changes look good! +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1827


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200864343
  
    Merging!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57065868
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala ---
    @@ -72,5 +74,68 @@ class TableEnvironment {
          new ScalaBatchTranslator(config).translate[T](table.relNode)
       }
     
    -}
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are used to name the Table fields.
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType)
    +    val dataSetTable = new DataSetTable[T](
    +      dataset.javaSet,
    +      fieldIndexes,
    +      fieldNames
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are renamed to the given set of fields.
    +   *
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   * @param fields the field names expression
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T], fields: Expression*): Unit = {
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](
    +      dataset.getType, fields.toArray)
     
    +    val dataSetTable = new DataSetTable[T](
    +      dataset.javaSet,
    +      fieldIndexes.toArray,
    +      fieldNames.toArray
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a Table under a unique name, so that it can be used in SQL queries.
    +   * @param name the Table name
    +   * @param table the Table to register
    +   */
    +  def registerTable[T](name: String, table: Table): Unit = {
    +    val tableTable = new TableTable(table.getRelNode())
    +    TranslationContext.registerTable(tableTable, name)
    +  }
    +
    +  /**
    +   * Retrieve a registered Table.
    +   * @param tableName the name under which the Table has been registered
    +   * @return the Table object
    +   */
    +  @throws[TableException]
    +  def scan(tableName: String): Table = {
    +    if (TranslationContext.isRegistered(tableName)) {
    +      val relBuilder = TranslationContext.getRelBuilder
    +      relBuilder.scan(tableName)
    +      new Table(relBuilder.build(), relBuilder)
    +    }
    +    else {
    +      throw new TableException("Table \"" + tableName + "\" was not found in the registry.")
    --- End diff --
    
    Use Scala's `s""` string building here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57066078
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala ---
    @@ -72,5 +74,68 @@ class TableEnvironment {
          new ScalaBatchTranslator(config).translate[T](table.relNode)
       }
     
    -}
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are used to name the Table fields.
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    --- End diff --
    
    Does it make sense to have an `AbstractTableEnvironment` for the common parts of the Java and the Scala `TableEnvironment`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57070386
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.schema
    +
    +import java.lang.Double
    +import java.util
    +import java.util.Collections
    +
    +import org.apache.calcite.plan.RelOptTable
    +import org.apache.calcite.plan.RelOptTable.ToRelContext
    +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
    +import org.apache.calcite.schema.Schema.TableType
    +import org.apache.calcite.schema.impl.AbstractTable
    +import org.apache.calcite.schema.{TranslatableTable, Statistic}
    +import org.apache.calcite.util.ImmutableBitSet
    +
    +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
    +
    +  override def getStatistic: Statistic = new DefaultTableStatistic
    +
    +  override def getJdbcTableType: TableType = ???
    +
    +  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
    +
    +  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
    +    relNode
    +  }
    +}
    +
    +class DefaultTableStatistic extends Statistic {
    --- End diff --
    
    Shall I just leave the method unimplemented then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57173349
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table
    +
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.expressions.Expression
    +import org.apache.flink.api.table.plan.TranslationContext
    +import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
    +
    +class AbstractTableEnvironment {
    +
    +  private[flink] val config = new TableConfig()
    +
    +  /**
    +   * Returns the table config to define the runtime behavior of the Table API.
    +   */
    +  def getConfig = config
    +
    +  /**
    +   * Registers a Table under a unique name, so that it can be used in SQL queries.
    +   * @param name the Table name
    +   * @param table the Table to register
    +   */
    +  def registerTable[T](name: String, table: Table): Unit = {
    +    val tableTable = new TableTable(table.getRelNode())
    +    TranslationContext.registerTable(tableTable, name)
    +  }
    +
    +  /**
    +   * Retrieve a registered Table.
    +   * @param tableName the name under which the Table has been registered
    +   * @return the Table object
    +   */
    +  @throws[TableException]
    +  def scan(tableName: String): Table = {
    +    if (TranslationContext.isRegistered(tableName)) {
    +      val relBuilder = TranslationContext.getRelBuilder
    +      relBuilder.scan(tableName)
    +      new Table(relBuilder.build(), relBuilder)
    +    }
    +    else {
    +      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
    +    }
    +  }
    +
    +  def registerUniqueNameDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    --- End diff --
    
    call this `registerDataSetInternal` and make it private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57178675
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -59,29 +64,48 @@ object TranslationContext {
           .traitDefs(ConventionTraitDef.INSTANCE)
           .build
     
    -    tabNames = Map[AbstractTable, String]()
    -
    +    tablesRegistry = Map[String, AbstractTable]()
         relBuilder = RelBuilder.create(frameworkConfig)
    -
         nameCntr.set(0)
     
       }
     
    +  /**
    +   * Adds a table to the Calcite schema so it can be used by the Table API
    +   */
       def addDataSet(newTable: DataSetTable[_]): String = {
    +    val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
    +    tables.add(tabName, newTable)
    +    tabName
    +  }
    +
    +  /**
    +   * Adds a table to the Calcite schema and the tables registry,
    +   * so it can be used by both Table API and SQL statements.
    +   */
    +  @throws[TableException]
    +  def registerTable(table: AbstractTable, name: String): Unit = {
     
    -    // look up name
    -    val tabName = tabNames.get(newTable)
    +    val existingTable = tablesRegistry.get(name)
    --- End diff --
    
    Should we check that the name does not follow the pattern `DataSetTable_[0-9]+`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57179250
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.table.test;
    +
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.table.TableEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.Table;
    +import org.apache.flink.api.table.TableException;
    +import org.apache.flink.api.table.plan.TranslationContext;
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.List;
    +
    +@RunWith(Parameterized.class)
    +public class RegisterDataSetITCase extends TableProgramsTestBase {
    +
    +	public RegisterDataSetITCase(TestExecutionMode mode, TableConfigMode configMode) {
    +		super(mode, configMode);
    +	}
    +
    +	@Test
    +	public void testSimpleRegister() throws Exception {
    +		final String tableName = "MyTable";
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		TableEnvironment tableEnv = getJavaTableEnvironment();
    +		TranslationContext.reset();
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		tableEnv.registerDataSet(tableName, ds);
    +		Table t = tableEnv.scan(tableName);
    +
    +		Table result = t.select("f0, f1");
    +
    +		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
    +		List<Row> results = resultSet.collect();
    +		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
    +				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
    +				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
    +		compareResultAsText(results, expected);
    +	}
    +
    +	@Test
    +	public void testRegisterWithFields() throws Exception {
    +		final String tableName = "MyTable";
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		TableEnvironment tableEnv = getJavaTableEnvironment();
    +		TranslationContext.reset();
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		tableEnv.registerDataSet(tableName, ds, "a, b, c");
    +		Table t = tableEnv.scan(tableName);
    +
    +		Table result = t.select("a, b, c");
    +
    +		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
    +		List<Row> results = resultSet.collect();
    +		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
    +				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
    +				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
    +				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
    +				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
    +				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
    +				"20,6,Comment#14\n" + "21,6,Comment#15\n";
    +		compareResultAsText(results, expected);
    +	}
    +
    +	@Test(expected = TableException.class)
    +	public void testRegisterExistingDatasetTable() throws Exception {
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		TableEnvironment tableEnv = getJavaTableEnvironment();
    +		TranslationContext.reset();
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		tableEnv.registerDataSet("MyTable", ds);
    +		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
    +				CollectionDataSets.getSmall5TupleDataSet(env);
    +		tableEnv.registerDataSet("MyTable", ds2);
    +	}
    +
    +	@Test(expected = TableException.class)
    +	public void testScanUnregisteredTable() throws Exception {
    +		TableEnvironment tableEnv = getJavaTableEnvironment();
    +		TranslationContext.reset();
    +
    +		tableEnv.scan("nonRegisteredTable");
    +	}
    +
    +	@Test
    +	public void testTableRegister() throws Exception {
    +		final String tableName = "MyTable";
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		TableEnvironment tableEnv = getJavaTableEnvironment();
    +		TranslationContext.reset();
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		Table t = tableEnv.fromDataSet(ds);
    +		tableEnv.registerTable(tableName, t);
    +		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
    +
    +		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
    +		List<Row> results = resultSet.collect();
    +		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
    +				"13,5\n" + "14,5\n" + "15,5\n" +
    +				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
    +		compareResultAsText(results, expected);
    +	}
    +
    +	@Test(expected = TableException.class)
    +	public void testRegisterExistingTable() throws Exception {
    --- End diff --
    
    I think we can remove this test because it goes against the `AbstractTableEnvironment` which is also checked in the Scala tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57068281
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.table.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.table.{TableException, Row}
    +import org.apache.flink.api.table.plan.TranslationContext
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit._
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +
    +@RunWith(classOf[Parameterized])
    +class RegisterDataSetITCase(
    --- End diff --
    
    If we move the common methods of the Java and Scala `TableEnvironment`s to an `AbstractTableEnvironment`, it is sufficient to only have the Scala tests, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57066656
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.schema
    +
    +import java.lang.Double
    +import java.util
    +import java.util.Collections
    +
    +import org.apache.calcite.plan.RelOptTable
    +import org.apache.calcite.plan.RelOptTable.ToRelContext
    +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
    +import org.apache.calcite.schema.Schema.TableType
    +import org.apache.calcite.schema.impl.AbstractTable
    +import org.apache.calcite.schema.{TranslatableTable, Statistic}
    +import org.apache.calcite.util.ImmutableBitSet
    +
    +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
    --- End diff --
    
    Add a brief comment what this class is about. The name might be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57069968
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -59,29 +64,55 @@ object TranslationContext {
           .traitDefs(ConventionTraitDef.INSTANCE)
           .build
     
    -    tabNames = Map[AbstractTable, String]()
    -
    +    tablesRegistry = Map[String, AbstractTable]()
         relBuilder = RelBuilder.create(frameworkConfig)
    -
         nameCntr.set(0)
     
       }
     
       def addDataSet(newTable: DataSetTable[_]): String = {
    +    val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
    +    tables.add(tabName, newTable)
    +    tabName
    +  }
    +
    +  @throws[TableException]
    +  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
    --- End diff --
    
    sounds right :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57075247
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---
    @@ -87,5 +90,72 @@ class TableEnvironment {
         new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
       }
     
    -}
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are used to name the Table fields.
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType)
    +    val dataSetTable = new DataSetTable[T](
    +      dataset,
    +      fieldIndexes,
    +      fieldNames
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
    +   * The fields of the DataSet type are renamed to the given set of fields.
    +   *
    +   * @param name the Table name
    +   * @param dataset the DataSet to register
    +   * @param fields the Table field names
    +   */
    +  def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = {
    +
    +    val exprs = ExpressionParser
    +      .parseExpressionList(fields)
    +      .toArray
    +
    +    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs)
     
    +    val dataSetTable = new DataSetTable[T](
    +      dataset,
    +      fieldIndexes.toArray,
    +      fieldNames.toArray
    +    )
    +    TranslationContext.addAndRegisterDataSet(dataSetTable, name)
    +  }
    +
    +  /**
    +   * Registers a Table under a unique name, so that it can be used in SQL queries.
    +   * @param name the Table name
    +   * @param table the Table to register
    +   */
    +  def registerTable[T](name: String, table: Table): Unit = {
    +    val tableTable = new TableTable(table.getRelNode())
    +    TranslationContext.registerTable(tableTable, name)
    +  }
    +
    +  /**
    +   * Retrieve a registered Table.
    +   * @param tableName the name under which the Table has been registered
    +   * @return the Table object
    +   */
    +  @throws[TableException]
    +  def scan(tableName: String): Table = {
    +    if (TranslationContext.isRegistered(tableName)) {
    +      val relBuilder = TranslationContext.getRelBuilder
    +      relBuilder.scan(tableName)
    +      new Table(relBuilder.build(), relBuilder)
    +    }
    +    else {
    +      throw new TableException("Table \"" + tableName + "\" was not found in the registry.")
    --- End diff --
    
    That's much nicer than my Java-ish way :S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200302428
  
    Thanks for the review! I've updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200025666
  
    Thanks for the PR! Looks good overall. 
    I think we can save a few LOCs by refactoring common parts a bit. Also I spotted more opportunities to use Scala's String building feature (`s""`) which is nicer than String concatenation, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57065169
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---
    @@ -93,7 +93,10 @@ object FlinkRuleSets {
         DataSetCalcRule.INSTANCE,
         DataSetJoinRule.INSTANCE,
         DataSetScanRule.INSTANCE,
    -    DataSetUnionRule.INSTANCE
    +    DataSetUnionRule.INSTANCE,
    +
    +    // convert a logical table scan to a relational expression
    +    TableScanRule.INSTANCE
    --- End diff --
    
    Can we move this rule to the top to the other default Calcite rules?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1827#issuecomment-200394123
  
    Proposed a few minor refactorings. Looks good otherwise :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1827#discussion_r57178398
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -59,29 +64,48 @@ object TranslationContext {
           .traitDefs(ConventionTraitDef.INSTANCE)
           .build
     
    -    tabNames = Map[AbstractTable, String]()
    -
    +    tablesRegistry = Map[String, AbstractTable]()
         relBuilder = RelBuilder.create(frameworkConfig)
    -
         nameCntr.set(0)
     
       }
     
    +  /**
    +   * Adds a table to the Calcite schema so it can be used by the Table API
    +   */
       def addDataSet(newTable: DataSetTable[_]): String = {
    --- End diff --
    
    rename to `registerDataSetTable()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---