You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2018/03/23 20:33:22 UTC

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

GitHub user suez1224 opened a pull request:

    https://github.com/apache/flink/pull/5758

    [FLINK-9059][Table API & SQL] add table type attribute; replace "sources" with "tables" in environm…

    ## What is the purpose of the change
    
    Add support for unified table source and sink declaration in environment file definition.
    This change prepares for FLINK-9049 (Create unified interfaces to configure and instatiate TableSink) We want to get this change in before 1.5 so it wont break the API in next flink release.
    
    
    ## Brief change log
    
      - Refactor sql client environment file definition to replace "sources" with "tables" 
      - Add "type" property to distinguish between table source and sink.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): ( no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: ( no )
      - The runtime per-record code paths (performance sensitive): (no )
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5758
    
----
commit 6da60ae23d83783137d44e282e06e16c947f0eb7
Author: Shuyi Chen <sh...@...>
Date:   2018-03-23T06:00:00Z

    add table type attribute; replace "sources" with "tables" in environment file

----


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179178156
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    --- End diff --
    
    I was wondering if a sink could have its own "schema configurations" for alignment with the output table schema? 
    For example a CassandraTableSink / JDBCTableSink would definitely throw exceptions when trying to execute an insert with mismatched schemas.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179149794
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
    --- End diff --
    
    Why not maintaining two separate maps for sources and sinks? Then we don't need instance of checks. If a table is both we can simply add it to both maps.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5758


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r178244473
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
     
     	private Execution execution;
     
     	private Deployment deployment;
     
     	public Environment() {
    -		this.sources = Collections.emptyMap();
    +		this.tables = Collections.emptyMap();
     		this.execution = new Execution();
     		this.deployment = new Deployment();
     	}
     
    -	public Map<String, Source> getSources() {
    -		return sources;
    +	public Map<String, TableDescriptor> getTables() {
    +		return tables;
     	}
     
    -	public void setSources(List<Map<String, Object>> sources) {
    -		this.sources = new HashMap<>(sources.size());
    -		sources.forEach(config -> {
    -			final Source s = Source.create(config);
    -			if (this.sources.containsKey(s.getName())) {
    -				throw new SqlClientException("Duplicate source name '" + s + "'.");
    +	public void setTables(List<Map<String, Object>> tables) {
    +		this.tables = new HashMap<>(tables.size());
    +		tables.forEach(config -> {
    +			if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
    +				throw new SqlClientException("The 'type' attribute of a table is missing.");
    --- End diff --
    
    Yes, the values can be (source, sink and both), please see https://issues.apache.org/jira/browse/FLINK-8866.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179182994
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    --- End diff --
    
    Thanks for pointing this out. You are right. Sinks can have a schema but no statistics. I was just wondering if we really need most of the refactorings in this PR. We need to rework the `TableSourceDescriptor` class in the near future because a Java user can access all `protected` field which is not very nice API design. 


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179686471
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    --- End diff --
    
    Thanks a lot, Timo. We can consider refactoring the code later. How about simply moving statisticsDescriptor to SourceTableDescriptor for now? 


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r177828794
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
     
     	private Execution execution;
     
     	private Deployment deployment;
     
     	public Environment() {
    -		this.sources = Collections.emptyMap();
    +		this.tables = Collections.emptyMap();
     		this.execution = new Execution();
     		this.deployment = new Deployment();
     	}
     
    -	public Map<String, Source> getSources() {
    -		return sources;
    +	public Map<String, TableDescriptor> getTables() {
    +		return tables;
     	}
     
    -	public void setSources(List<Map<String, Object>> sources) {
    -		this.sources = new HashMap<>(sources.size());
    -		sources.forEach(config -> {
    -			final Source s = Source.create(config);
    -			if (this.sources.containsKey(s.getName())) {
    -				throw new SqlClientException("Duplicate source name '" + s + "'.");
    +	public void setTables(List<Map<String, Object>> tables) {
    +		this.tables = new HashMap<>(tables.size());
    +		tables.forEach(config -> {
    +			if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
    +				throw new SqlClientException("The 'type' attribute of a table is missing.");
    --- End diff --
    
    Just curious, any chance we need to have a Table to be both `source` and `sink` type. (e.g. similar to queryable state, if user wants to maintain explicitly a table for that purpose)


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179150731
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    --- End diff --
    
    I'm wondering if we really need these changes. A sink will never have a schema or statistics.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179715295
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
    --- End diff --
    
    Also true. I will merge this now to have it in the next release. We can still refactor if necessary as it is internal API.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r178330131
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    +  protected var metaDescriptor: Option[Metadata] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.foreach(_.addProperties(properties))
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +    metaDescriptor.foreach(_.addProperties(properties))
    +  }
    +
    +  /**
    +    * Reads table statistics from the descriptors properties.
    +    */
    +  protected def getTableStats: Option[TableStats] = {
    +    val normalizedProps = new DescriptorProperties()
    +    addProperties(normalizedProps)
    +    val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
    +    rowCount match {
    +      case Some(cnt) =>
    +        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
    +        Some(TableStats(cnt, columnStats.asJava))
    +      case None =>
    +        None
    +    }
    +  }
    +}
    +
    +object TableDescriptor {
    +  /**
    +    * Key for describing the type of this table, valid values are ('source').
    +    */
    +  val TABLE_TYPE = "type"
    +
    +  /**
    +    * Valid TABLE_TYPE value.
    +    */
    +  val TABLE_TYPE_SOURCE = "source"
    --- End diff --
    
    The convention in the code currently uses just constants, please see KafkaValidator or RowtimeValidator. Also, it seems to be premature optimization to me given such simple use of the constant.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r179686465
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
    --- End diff --
    
    That is another option. But the purpose is if a table is both source and sink, we dont need to duplicate the config in both the sources and sinks section, as it might be error prone that you modify the table config in sources, but forget to modify the config of the same table in sinks section, thus causing inconsistency. What do you think?


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r177830071
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -102,10 +112,10 @@ public static Environment parse(String content) throws IOException {
     	public static Environment merge(Environment env1, Environment env2) {
     		final Environment mergedEnv = new Environment();
     
    -		// merge sources
    -		final Map<String, Source> sources = new HashMap<>(env1.getSources());
    -		mergedEnv.getSources().putAll(env2.getSources());
    -		mergedEnv.sources = sources;
    +		// merge tables
    +		final Map<String, TableDescriptor> sources = new HashMap<>(env1.getTables());
    --- End diff --
    
    `final Map<String, TableDescriptor> tables`


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r177836030
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    +  protected var metaDescriptor: Option[Metadata] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.foreach(_.addProperties(properties))
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +    metaDescriptor.foreach(_.addProperties(properties))
    +  }
    +
    +  /**
    +    * Reads table statistics from the descriptors properties.
    +    */
    +  protected def getTableStats: Option[TableStats] = {
    +    val normalizedProps = new DescriptorProperties()
    +    addProperties(normalizedProps)
    +    val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
    +    rowCount match {
    +      case Some(cnt) =>
    +        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
    +        Some(TableStats(cnt, columnStats.asJava))
    +      case None =>
    +        None
    +    }
    +  }
    +}
    +
    +object TableDescriptor {
    +  /**
    +    * Key for describing the type of this table, valid values are ('source').
    +    */
    +  val TABLE_TYPE = "type"
    +
    +  /**
    +    * Valid TABLE_TYPE value.
    +    */
    +  val TABLE_TYPE_SOURCE = "source"
    --- End diff --
    
    Maybe we can use either a ENUM or hierarchy to represent "valid" types? Does this seems like a good idea, such as: `TABLE_TYPE.SOURCE_TYPE`


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 closed the pull request at:

    https://github.com/apache/flink/pull/5758


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r178334028
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -29,38 +30,47 @@
     
     /**
      * Environment configuration that represents the content of an environment file. Environment files
    - * define sources, execution, and deployment behavior. An environment might be defined by default or
    + * define tables, execution, and deployment behavior. An environment might be defined by default or
      * as part of a session. Environments can be merged or enriched with properties (e.g. from CLI command).
      *
      * <p>In future versions, we might restrict the merging or enrichment of deployment properties to not
      * allow overwriting of a deployment by a session.
      */
     public class Environment {
     
    -	private Map<String, Source> sources;
    +	private Map<String, TableDescriptor> tables;
     
     	private Execution execution;
     
     	private Deployment deployment;
     
     	public Environment() {
    -		this.sources = Collections.emptyMap();
    +		this.tables = Collections.emptyMap();
     		this.execution = new Execution();
     		this.deployment = new Deployment();
     	}
     
    -	public Map<String, Source> getSources() {
    -		return sources;
    +	public Map<String, TableDescriptor> getTables() {
    +		return tables;
     	}
     
    -	public void setSources(List<Map<String, Object>> sources) {
    -		this.sources = new HashMap<>(sources.size());
    -		sources.forEach(config -> {
    -			final Source s = Source.create(config);
    -			if (this.sources.containsKey(s.getName())) {
    -				throw new SqlClientException("Duplicate source name '" + s + "'.");
    +	public void setTables(List<Map<String, Object>> tables) {
    +		this.tables = new HashMap<>(tables.size());
    +		tables.forEach(config -> {
    +			if (!config.containsKey(TableDescriptor.TABLE_TYPE())) {
    +				throw new SqlClientException("The 'type' attribute of a table is missing.");
    --- End diff --
    
    Got it, so `both` should probably be added here once `sink` type is supported in FLINK-8866. Thanks for the clarification.


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
GitHub user suez1224 reopened a pull request:

    https://github.com/apache/flink/pull/5758

    [FLINK-9059][Table API & SQL] add table type attribute; replace "sources" with "tables" in environm…

    ## What is the purpose of the change
    
    Add support for unified table source and sink declaration in environment file definition.
    This change prepares for FLINK-9049 (Create unified interfaces to configure and instatiate TableSink) We want to get this change in before 1.5 so it wont break the API in next flink release.
    
    
    ## Brief change log
    
      - Refactor sql client environment file definition to replace "sources" with "tables" 
      - Add "type" property to distinguish between table source and sink.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): ( no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: ( no )
      - The runtime per-record code paths (performance sensitive): (no )
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5758
    
----
commit 7886287edd464c4a9fc84a47be214ff39b8e5ad0
Author: Shuyi Chen <sh...@...>
Date:   2018-03-23T06:00:00Z

    add table type attribute; replace "sources" with "tables" in environment file

commit 63a3faa115254564cb5fb8e85f0e0e50e33062fe
Author: Shuyi Chen <sh...@...>
Date:   2018-03-30T17:12:10Z

    fix comments

----


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r178334057
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing table sources and sinks.
    +  */
    +abstract class TableDescriptor extends Descriptor {
    +
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var formatDescriptor: Option[FormatDescriptor] = None
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    +  protected var metaDescriptor: Option[Metadata] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.foreach(_.addProperties(properties))
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +    metaDescriptor.foreach(_.addProperties(properties))
    +  }
    +
    +  /**
    +    * Reads table statistics from the descriptors properties.
    +    */
    +  protected def getTableStats: Option[TableStats] = {
    +    val normalizedProps = new DescriptorProperties()
    +    addProperties(normalizedProps)
    +    val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
    +    rowCount match {
    +      case Some(cnt) =>
    +        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
    +        Some(TableStats(cnt, columnStats.asJava))
    +      case None =>
    +        None
    +    }
    +  }
    +}
    +
    +object TableDescriptor {
    +  /**
    +    * Key for describing the type of this table, valid values are ('source').
    +    */
    +  val TABLE_TYPE = "type"
    +
    +  /**
    +    * Valid TABLE_TYPE value.
    +    */
    +  val TABLE_TYPE_SOURCE = "source"
    --- End diff --
    
    make sense 👍 


---

[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5758#discussion_r178244750
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -102,10 +112,10 @@ public static Environment parse(String content) throws IOException {
     	public static Environment merge(Environment env1, Environment env2) {
     		final Environment mergedEnv = new Environment();
     
    -		// merge sources
    -		final Map<String, Source> sources = new HashMap<>(env1.getSources());
    -		mergedEnv.getSources().putAll(env2.getSources());
    -		mergedEnv.sources = sources;
    +		// merge tables
    +		final Map<String, TableDescriptor> sources = new HashMap<>(env1.getTables());
    --- End diff --
    
    Good catch on the naming.


---