You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/02/22 16:34:15 UTC

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5564

    [FLINK-8538] [table] Add a Kafka table source factory with JSON format

    ## What is the purpose of the change
    
    This PR is a continuation of #5505. Since this is the first connector with format, time attributes, and table source factory, I encountered many inconsistencies that I try to fix in this PR.
    
    
    ## Brief change log
    
    - Change property `connector.version` to `connector.property-version` in order to use `version` for things like Kafka
    - Add more utility functions for better Java<->Scala interoperability
    - Add full rowtime support
    - Derive JSON mapping from schema
    - Derive schema from source if required
    
    
    ## Verifying this change
    
    Various unit tests implemented.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): yes
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs/ScalaDocs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-8538

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5564
    
----
commit bc6af3a27155651622149cf4a74e52118773471a
Author: Xingcan Cui <xi...@...>
Date:   2018-02-12T10:11:36Z

    [FLINK-8538][table]Add a Kafka table source factory with JSON format support

commit 233a8ff0b88359dd702cfd6e447e0585e411c8c2
Author: Timo Walther <tw...@...>
Date:   2018-02-19T12:35:45Z

    [FLINK-8538] [table] Improve unified table sources

----


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170257084
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    +  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
    +
    +  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    --- End diff --
    
    BOUNDING -> BOUNDED


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170232160
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         Some(schemaBuilder.build())
       }
     
    +  /**
    +    * Returns a table schema under the given key if it exists.
    +    */
    +  def getOptionalTableSchema(key: String): Optional[TableSchema] = toJava(getTableSchema(key))
    +
    +  /**
    +    * Returns the type information under the given key if it exists.
    +    */
    +  def getType(key: String): Option[TypeInformation[_]] = {
    +    properties.get(key).map(TypeStringUtils.readTypeInfo)
    +  }
    +
    +  /**
    +    * Returns the type information under the given key if it exists.
    +    * This method is intended for Java code.
    +    */
    +  def getOptionalType(key: String): Optional[TypeInformation[_]] = {
    +    toJava(getType(key))
    +  }
    +
    +  /**
    +    * Returns a prefix subset of properties.
    +    */
    +  def getPrefix(prefixKey: String): Map[String, String] = {
    +    val prefix = prefixKey + '.'
    +    properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
    +      k.substring(prefix.length) -> v // remove prefix
    +    }.toMap
    +  }
    +
    +  /**
    +    * Returns a prefix subset of properties.
    +    * This method is intended for Java code.
    +    */
    +  def getPrefixMap(prefixKey: String): JMap[String, String] = getPrefix(prefixKey).asJava
    --- End diff --
    
    I find the different names for methods that do the same confusing. I'd just remove the Scala methods.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170561781
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed mapping of properties under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    *                             schema.fields.1.name = test2
    +    *
    +    * The arity of the propertySets can differ.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedVariableProperties(
    +      key: String,
    +      propertySets: JList[JMap[String, String]])
    +    : Unit = {
    +    checkNotNull(key)
    +    checkNotNull(propertySets)
    +    putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap))
    +  }
    +
       // ----------------------------------------------------------------------------------------------
     
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    */
       def getString(key: String): Option[String] = {
         properties.get(key)
       }
     
    -  def getCharacter(key: String): Option[Character] = getString(key) match {
    -    case Some(c) =>
    -      if (c.length != 1) {
    -        throw new ValidationException(s"The value of $key must only contain one character.")
    -      }
    -      Some(c.charAt(0))
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    * This method is intended for Java code.
    +    */
    +  def getOptionalString(key: String): Optional[String] = toJava(getString(key))
     
    -    case None => None
    +  /**
    +    * Returns a character value under the given key if it exists.
    +    */
    +  def getCharacter(key: String): Option[Character] = getString(key).map { c =>
    +    if (c.length != 1) {
    +      throw new ValidationException(s"The value of $key must only contain one character.")
    +    }
    +    c.charAt(0)
       }
     
    -  def getBoolean(key: String): Option[Boolean] = getString(key) match {
    -    case Some(b) => Some(JBoolean.parseBoolean(b))
    -
    -    case None => None
    +  /**
    +    * Returns a class value under the given key if it exists.
    +    */
    +  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
    --- End diff --
    
    We need to provide the superclass to validate what we just deserialized otherwise it would lead to class cast exception.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170304145
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    +      descriptor: Descriptor,
    +      property: String,
    +      invalidValue: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafePut(property, invalidValue)
         validator().validate(properties)
       }
     
    -  def verifyMissingProperty(removeProperty: String): Unit = {
    +  def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafeRemove(removeProperty)
         validator().validate(properties)
       }
     }
    +
    +class TestTableSourceDescriptor(connector: ConnectorDescriptor)
    +  extends TableSourceDescriptor(connector) {
    --- End diff --
    
    Sorry about that. I forgot to rebuild after rebasing.


---

[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5564
  
    What do you think @xccui @fhueske?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170230041
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed mapping of properties under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    *                             schema.fields.1.name = test2
    +    *
    +    * The arity of the propertySets can differ.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedVariableProperties(
    --- End diff --
    
    Remove Scala equivalent?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170256878
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    +  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
    +
    +  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
    +  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
    --- End diff --
    
    `rowtime.watermarks.custom.serialized`?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170191830
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.typeinfo.Types;
    +import org.apache.flink.formats.json.JsonSchemaConverter;
    +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
    +import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.table.descriptors.Json;
    +import org.apache.flink.table.descriptors.Kafka;
    +import org.apache.flink.table.descriptors.Schema;
    +import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
    +import org.apache.flink.table.sources.TableSource;
    +import org.apache.flink.table.sources.TableSourceFactory;
    +import org.apache.flink.table.sources.TableSourceFactoryService;
    +import org.apache.flink.types.Row;
    +
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for {@link KafkaJsonTableSourceFactory}.
    + */
    +public abstract class KafkaJsonTableSourceFactoryTestBase {
    +
    +	private static final String JSON_SCHEMA =
    +		"{" +
    +		"  'title': 'Fruit'," +
    +		"  'type': 'object'," +
    +		"  'properties': {" +
    +		"    'name': {" +
    +		"      'type': 'string'" +
    +		"    }," +
    +		"    'count': {" +
    +		"      'type': 'integer'" +
    +		"    }," +
    +		"    'time': {" +
    +		"      'description': 'Age in years'," +
    +		"      'type': 'number'" +
    +		"    }" + "  }," +
    +		"  'required': ['name', 'count', 'time']" +
    +		"}";
    +
    +	private static final String TOPIC = "test-topic";
    +
    +	protected abstract String version();
    +
    +	protected abstract KafkaJsonTableSource.Builder builder();
    +
    +	protected abstract KafkaJsonTableSourceFactory factory();
    +
    +	@Test
    +	public void testResultingTableSource() {
    +
    +		// construct table source using a builder
    +
    +		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("fruit-name", "name");
    +		tableJsonMapping.put("count", "count");
    +		tableJsonMapping.put("event-time", "time");
    +
    +		final Properties props = new Properties();
    +		props.put("group.id", "test-group");
    +		props.put("bootstrap.servers", "localhost:1234");
    +
    +		final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
    +		specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
    +		specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
    +
    +		final KafkaTableSource builderSource = builder()
    +				.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA)))
    +				.failOnMissingField(true)
    +				.withTableToJsonMapping(tableJsonMapping)
    +				.withKafkaProperties(props)
    +				.forTopic(TOPIC)
    +				.fromSpecificOffsets(specificOffsets)
    +				.withSchema(
    +					TableSchema.builder()
    +						.field("fruit-name", Types.STRING)
    +						.field("count", Types.INT)
    +						.field("event-time", Types.LONG)
    +						.field("proc-time", Types.SQL_TIMESTAMP)
    +						.build())
    +				.withProctimeAttribute("proc-time")
    +				.build();
    +
    +		// construct table source using descriptors and table source factory
    +
    +		final Map<Integer, Long> offsets = new HashMap<>();
    +		offsets.put(0, 100L);
    +		offsets.put(1, 123L);
    +
    +		final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor(
    +				new Kafka()
    +					.version(version())
    +					.topic(TOPIC)
    +					.properties(props)
    +					.startFromSpecificOffsets(offsets))
    +			.addFormat(
    +				new Json()
    +						.jsonSchema(JSON_SCHEMA)
    +						.failOnMissingField(true))
    +			.addSchema(
    +				new Schema()
    +						.field("fruit-name", Types.STRING).from("name")
    +						.field("count", Types.INT) // no from so it must match with the input
    +						.field("event-time", Types.LONG).from("time")
    +						.field("proc-time", Types.SQL_TIMESTAMP).proctime());
    +		final TableSourceFactory<Row> factory = factory();
    --- End diff --
    
    The `factory` is useless.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170297941
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    +      descriptor: Descriptor,
    +      property: String,
    +      invalidValue: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafePut(property, invalidValue)
         validator().validate(properties)
       }
     
    -  def verifyMissingProperty(removeProperty: String): Unit = {
    +  def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = {
    --- End diff --
    
    rename to `removePropertyAndVerify()`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170180103
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed mapping of properties under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    *                             schema.fields.1.name = test2
    +    *
    +    * The arity of the propertySets can differ.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedVariableProperties(
    +      key: String,
    +      propertySets: JList[JMap[String, String]])
    +    : Unit = {
    +    checkNotNull(key)
    +    checkNotNull(propertySets)
    +    putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap))
    +  }
    +
       // ----------------------------------------------------------------------------------------------
     
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    */
       def getString(key: String): Option[String] = {
         properties.get(key)
       }
     
    -  def getCharacter(key: String): Option[Character] = getString(key) match {
    -    case Some(c) =>
    -      if (c.length != 1) {
    -        throw new ValidationException(s"The value of $key must only contain one character.")
    -      }
    -      Some(c.charAt(0))
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    * This method is intended for Java code.
    +    */
    +  def getOptionalString(key: String): Optional[String] = toJava(getString(key))
     
    -    case None => None
    +  /**
    +    * Returns a character value under the given key if it exists.
    +    */
    +  def getCharacter(key: String): Option[Character] = getString(key).map { c =>
    +    if (c.length != 1) {
    +      throw new ValidationException(s"The value of $key must only contain one character.")
    +    }
    +    c.charAt(0)
       }
     
    -  def getBoolean(key: String): Option[Boolean] = getString(key) match {
    -    case Some(b) => Some(JBoolean.parseBoolean(b))
    -
    -    case None => None
    +  /**
    +    * Returns a class value under the given key if it exists.
    +    */
    +  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
    --- End diff --
    
    It seems we also need a `getOptionalClass()` wrapper for it. Additionally, I think it's a little strange to provide the superclass as a parameter here.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170194561
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala ---
    @@ -27,13 +27,27 @@ class ConnectorDescriptorValidator extends DescriptorValidator {
     
       override def validate(properties: DescriptorProperties): Unit = {
         properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1)
    -    properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
    +    properties.validateInt(CONNECTOR_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
       }
     }
     
     object ConnectorDescriptorValidator {
     
    +  /**
    +    * Key for describing the type of the connector. Usually used for factory discovery.
    +    */
       val CONNECTOR_TYPE = "connector.type"
    +
    +  /**
    +    *  Key for describing the property version. This property can be used for backwards
    --- End diff --
    
    Two spaces here...


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170562304
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ---
    @@ -0,0 +1,123 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors;
    +
    +import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
    +import org.apache.flink.table.api.ValidationException;
    +
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +
    +/**
    + * The validator for {@link Kafka}.
    + */
    +public class KafkaValidator extends ConnectorDescriptorValidator {
    +
    +	public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
    +	public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
    +	public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
    +	public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
    +	public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
    +	public static final String CONNECTOR_TOPIC = "connector.topic";
    +	public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = "partition";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
    +	public static final String CONNECTOR_PROPERTIES = "connector.properties";
    --- End diff --
    
    Yes, these are very Kafka specific properties. That's where they are also pushed down by the table source builder.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170290993
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala ---
    @@ -18,48 +18,67 @@
     
     package org.apache.flink.table.descriptors
     
    +import java.util
    +
     import org.apache.flink.streaming.api.watermark.Watermark
     import org.apache.flink.table.api.ValidationException
     import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
     import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
     import org.apache.flink.types.Row
     import org.junit.Test
     
    -class RowtimeTest extends DescriptorTestBase {
    +import scala.collection.JavaConverters._
     
    -  @Test
    -  def testRowtime(): Unit = {
    -    val desc = Rowtime()
    -      .timestampsFromField("otherField")
    -      .watermarksPeriodicBounding(1000L)
    -    val expected = Seq(
    -      "rowtime.0.version" -> "1",
    -      "rowtime.0.timestamps.type" -> "from-field",
    -      "rowtime.0.timestamps.from" -> "otherField",
    -      "rowtime.0.watermarks.type" -> "periodic-bounding",
    -      "rowtime.0.watermarks.delay" -> "1000"
    -    )
    -    verifyProperties(desc, expected)
    -  }
    +class RowtimeTest extends DescriptorTestBase {
     
       @Test(expected = classOf[ValidationException])
       def testInvalidWatermarkType(): Unit = {
    -    verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
    +    verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx")
    --- End diff --
    
    use constant instead of `"rowtime.watermarks.type"`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170298004
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    --- End diff --
    
    rename to `addPropertyAndVerify()`


---

[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5564
  
    Thanks for the update. 
    I think this is good to merge.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171280725
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
    +              s"from source. Please specify the source field from which it can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    Hi @twalthr, sorry for mentioning you again. I was a little confused about this method. Could you help explain its usage? Besides, the rowtime field should be an existing field in the input format. Why removing it here?
    
    Thanks, Xingcan


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170272030
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_PROPERTY_VERSION = "schema.property-version"
    +  val SCHEMA_FIELDS = "schema.fields"
    +  val SCHEMA_FIELDS_NAME = "name"
    +  val SCHEMA_FIELDS_TYPE = "type"
    +  val SCHEMA_FIELDS_PROCTIME = "proctime"
    +  val SCHEMA_FIELDS_FROM = "from"
    +  val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
    +  val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
    +  val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
    +
    +  // utilities
    +
    +  /**
    +    * Derives a schema from properties and source.
    +    */
    +  def deriveSchema(
    +      properties: DescriptorProperties,
    +      sourceSchema: Option[TableSchema])
    +    : TableSchema = {
    +
    +    val builder = TableSchema.builder()
    +
    +    val schema = properties.getTableSchema(SCHEMA_FIELDS)
    +
    +    val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
    +
    +    val sourceNamesAndTypes = derivationMode match {
    +      case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if sourceSchema.isDefined =>
    +        // sort by name
    +        sourceSchema.get.getColumnNames
    +          .zip(sourceSchema.get.getTypes)
    +          .sortBy(_._1)
    +
    +      case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if sourceSchema.isDefined =>
    +        sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
    +
    +      case Some(_) =>
    +        throw new ValidationException("Derivation of fields is not supported from this source.")
    +
    +      case None =>
    +        Array[(String, TypeInformation[_])]()
    +    }
    +
    +    // add source fields
    +    sourceNamesAndTypes.foreach { case (n, t) =>
    +      builder.field(n, t)
    +    }
    +
    +    // add schema fields
    +    schema.foreach { ts =>
    +      val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
    +      schemaNamesAndTypes.foreach { case (n, t) =>
    +          // do not allow overwriting
    +          if (sourceNamesAndTypes.exists(_._1 == n)) {
    +            throw new ValidationException(
    +              "Specified schema fields must not overwrite fields derived from the source.")
    +          }
    +          builder.field(n, t)
    +      }
    +    }
    +
    +    builder.build()
    +  }
    +
    +  /**
    +    * Derives a schema from properties and source.
    +    * This method is intended for Java code.
    +    */
    +  def deriveSchema(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : TableSchema = {
    +    deriveSchema(
    +      properties,
    +      Option(sourceSchema.orElse(null)))
    +  }
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): Option[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
    +        }
    +      }
    +    }
    +    None
    +  }
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    * This method is intended for Java code.
    +    */
    +  def deriveProctimeOptional(properties: DescriptorProperties): Optional[String] = {
    +    Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get,
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Find a table source field mapping.
    +    * This method is intended for Java code.
    +    */
    +  def deriveFieldMapping(
    --- End diff --
    
    Provide only Java-friendly methods?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170569299
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    --- End diff --
    
    We also use this `from` for `schema`. `from-field` is already defined in `rowtime.timestamps.type`.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170273135
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---
    @@ -41,10 +41,10 @@ trait TableSourceFactory[T] {
         *   - connector.type
         *   - format.type
         *
    -    * Specified versions allow the framework to provide backwards compatible properties in case of
    --- End diff --
    
    (not related to this change) Should we add something like a `priority` property to `TableSourceFactory` that determines in which order factories are matched. If two factories match, we would use the factory with the higher priority.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170185039
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ---
    @@ -0,0 +1,123 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors;
    +
    +import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
    +import org.apache.flink.table.api.ValidationException;
    +
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.function.Consumer;
    +
    +/**
    + * The validator for {@link Kafka}.
    + */
    +public class KafkaValidator extends ConnectorDescriptorValidator {
    +
    +	public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
    +	public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
    +	public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
    +	public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
    +	public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
    +	public static final String CONNECTOR_TOPIC = "connector.topic";
    +	public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    +	public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = "partition";
    +	public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
    +	public static final String CONNECTOR_PROPERTIES = "connector.properties";
    --- End diff --
    
    If we take the required properties (e.g., bootstrap.servers, group.id) as common ones here, the validation logic is pushed down to the underlayer components, right?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170256851
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    +  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
    +
    +  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
    --- End diff --
    
    `rowtime.watermarks.custom.class`?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170296912
  
    --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors;
    +
    +import org.apache.flink.table.api.ValidationException;
    +
    +/**
    +  * Validator for {@link Json}.
    +  */
    +public class JsonValidator extends FormatDescriptorValidator {
    +
    +	public static final String FORMAT_TYPE_VALUE = "json";
    +	public static final String FORMAT_SCHEMA = "format.schema";
    +	public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
    +	public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field";
    +
    +	@Override
    +	public void validate(DescriptorProperties properties) {
    +		super.validate(properties);
    +		final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
    +		final boolean hasSchemaString = properties.containsKey(FORMAT_JSON_SCHEMA);
    +		if (hasSchema && hasSchemaString) {
    +			throw new ValidationException("A definition of both a schema and JSON schema is not allowed.");
    +		} else if (!hasSchema && !hasSchemaString) {
    +			throw new ValidationException("A definition of a schema and JSON schema is required.");
    --- End diff --
    
    replace "and" by "or" -> "A definition of a schema or JSON schema is required."


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171354820
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
    +              s"from source. Please specify the source field from which it can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    Thanks for your explanation @twalthr. I totally agree that we should avoid letting the users define schemas multi-times. As the names and definitions are still confusing me, I'd share my understanding to see if it's correct. Let's take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be illustrated with
    
    ```
    json-format-schema(physical, optional) <-- mapping --> result-schema(physical)
    result-schema(physical) + timestamp fields(logical) = table-schema(logical, required)
    ```
    
    The `JSON-format-schema` could be either defined with a JSON-schema string(FORMAT_JSON_SCHEMA) or a `TypeInformation`(FORMAT_SCHEMA). When the JSON-format-schema is not provided, we use the `deriveFormatFields()` method to generate it from the result-schema and add something like a "self-mapping". IMO, if we don't pass the `jsonSchema` to the builder, there's no need to define the mapping, right?
    
    The timestamp mechanism may even be complicated for Kafka since it supports `withKafkaTimestampAsRowtimeAttribute` (though according to FLINK-8500, this method seems to not work for now). I suppose we need a new `RowtimeDescriptor`for it? 


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170230912
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed mapping of properties under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    *                             schema.fields.1.name = test2
    +    *
    +    * The arity of the propertySets can differ.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedVariableProperties(
    +      key: String,
    +      propertySets: JList[JMap[String, String]])
    +    : Unit = {
    +    checkNotNull(key)
    +    checkNotNull(propertySets)
    +    putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap))
    +  }
    +
       // ----------------------------------------------------------------------------------------------
     
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    */
       def getString(key: String): Option[String] = {
         properties.get(key)
       }
     
    -  def getCharacter(key: String): Option[Character] = getString(key) match {
    -    case Some(c) =>
    -      if (c.length != 1) {
    -        throw new ValidationException(s"The value of $key must only contain one character.")
    -      }
    -      Some(c.charAt(0))
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    * This method is intended for Java code.
    +    */
    +  def getOptionalString(key: String): Optional[String] = toJava(getString(key))
     
    -    case None => None
    +  /**
    +    * Returns a character value under the given key if it exists.
    +    */
    +  def getCharacter(key: String): Option[Character] = getString(key).map { c =>
    +    if (c.length != 1) {
    +      throw new ValidationException(s"The value of $key must only contain one character.")
    +    }
    +    c.charAt(0)
       }
     
    -  def getBoolean(key: String): Option[Boolean] = getString(key) match {
    -    case Some(b) => Some(JBoolean.parseBoolean(b))
    -
    -    case None => None
    +  /**
    +    * Returns a class value under the given key if it exists.
    +    */
    +  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
    +    properties.get(key).map { name =>
    +      val clazz = try {
    +        Class.forName(
    +          name,
    +          true,
    +          Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
    +      } catch {
    +        case e: Exception =>
    +          throw new ValidationException(s"Coult not get class for key '$key'.", e)
    --- End diff --
    
    typo: Could


---

[GitHub] flink issue #5564: [FLINK-8538] [table] Add a Kafka table source factory wit...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5564
  
    Thank you @fhueske. Will merge...


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170256073
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    --- End diff --
    
    `rowtime.timestamps.custom.class`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170256006
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    --- End diff --
    
    `rowtime.timestamps.from.field`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170188551
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    +      descriptor: Descriptor,
    +      property: String,
    +      invalidValue: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafePut(property, invalidValue)
         validator().validate(properties)
       }
     
    -  def verifyMissingProperty(removeProperty: String): Unit = {
    +  def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafeRemove(removeProperty)
         validator().validate(properties)
       }
     }
    +
    +class TestTableSourceDescriptor(connector: ConnectorDescriptor)
    --- End diff --
    
    ` this.connectorDescriptor = Some(connector)`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170224845
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---
    @@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema getDeserializationSchema() {
     
     	@Override
     	public String explainSource() {
    -		return "KafkaJSONTableSource";
    +		return "KafkaJsonTableSource";
     	}
     
    -	//////// SETTERS FOR OPTIONAL PARAMETERS
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (!(o instanceof KafkaJsonTableSource)) {
    +			return false;
    +		}
    +		if (!super.equals(o)) {
    +			return false;
    +		}
    +		KafkaJsonTableSource that = (KafkaJsonTableSource) o;
    +		return failOnMissingField == that.failOnMissingField &&
    +			Objects.equals(jsonSchema, that.jsonSchema) &&
    +			Objects.equals(fieldMapping, that.fieldMapping);
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField);
    --- End diff --
    
    `TableSchema` does not override `hashCode()`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170175378
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    +      descriptor: Descriptor,
    +      property: String,
    +      invalidValue: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafePut(property, invalidValue)
         validator().validate(properties)
       }
     
    -  def verifyMissingProperty(removeProperty: String): Unit = {
    +  def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafeRemove(removeProperty)
         validator().validate(properties)
       }
     }
    +
    +class TestTableSourceDescriptor(connector: ConnectorDescriptor)
    +  extends TableSourceDescriptor(connector) {
    --- End diff --
    
    Too many arguments.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170273904
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala ---
    @@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase {
           "format.fields.3.name" -> "field4",
           "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
    --- End diff --
    
    Shouldn't this fail because CSV does not support nested data?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170274382
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    --- End diff --
    
    Why is this important?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170229569
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         )
       }
     
    +  /**
    +    * Adds a table schema under the given key. This method is intended for Java code.
    +    */
    +  def putTableSchema(key: String, nameAndType: JList[JTuple2[String, String]]): Unit = {
    --- End diff --
    
    I think we should drop the Scala equivalent method. This is not a public API class that needs a shiny Scala interface but should be usable from Java and Scala.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170257053
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    +  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
    +
    +  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
    +  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
    +  val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay"
    --- End diff --
    
    `rowtime.watermarks.bounded.delay`?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5564


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170181909
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala ---
    @@ -43,7 +42,7 @@ object TableSourceFactoryService extends Logging {
       def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = {
    --- End diff --
    
    Rename to `findAndCreateTableSource()`?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170271213
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -32,11 +32,36 @@ import scala.collection.mutable
       */
     class Schema extends Descriptor {
     
    +  private var deriveFields: Option[String] = None
    +
       // maps a field name to a list of properties that describe type, origin, and the time attribute
       private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()
     
       private var lastField: Option[String] = None
     
    +  /**
    +    * Derives field names and types from a preceding connector or format. Additional fields that
    +    * are defined in this schema extend the derived fields. The derived fields are
    +    * added in an alphabetical order according to their field name.
    +    */
    +  def deriveFieldsAlphabetically(): Schema = {
    --- End diff --
    
    I think we should support inferring the format from the schema rather than the schema from the format.
    This would be more aligned with how it would work in a `CREATE TABLE` statement and how Hive is doing it for example. We should still support to define the format explicitly though.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170229924
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         put(key, clazz.getName)
       }
     
    +  /**
    +    * Adds a string under the given key.
    +    */
       def putString(key: String, str: String): Unit = {
         checkNotNull(key)
         checkNotNull(str)
         put(key, str)
       }
     
    +  /**
    +    * Adds a boolean under the given key.
    +    */
       def putBoolean(key: String, b: Boolean): Unit = {
         checkNotNull(key)
         put(key, b.toString)
       }
     
    +  /**
    +    * Adds a long under the given key.
    +    */
       def putLong(key: String, l: Long): Unit = {
         checkNotNull(key)
         put(key, l.toString)
       }
     
    +  /**
    +    * Adds an integer under the given key.
    +    */
       def putInt(key: String, i: Int): Unit = {
         checkNotNull(key)
         put(key, i.toString)
       }
     
    +  /**
    +    * Adds a character under the given key.
    +    */
       def putCharacter(key: String, c: Character): Unit = {
         checkNotNull(key)
         checkNotNull(c)
         put(key, c.toString)
       }
     
    +  /**
    +    * Adds a table schema under the given key.
    +    */
       def putTableSchema(key: String, schema: TableSchema): Unit = {
         putTableSchema(key, normalizeTableSchema(schema))
       }
     
    +  /**
    +    * Adds a table schema under the given key.
    +    */
       def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
    --- End diff --
    
    Remove?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170569676
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    --- End diff --
    
    I wouldn't do another sub property. It looks already complicated enough in YAML.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170256141
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala ---
    @@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
     object RowtimeValidator {
     
       val ROWTIME = "rowtime"
    -
    -  // per rowtime properties
    -
    -  val ROWTIME_VERSION = "version"
    -  val TIMESTAMPS_TYPE = "timestamps.type"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    -  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    -  val TIMESTAMPS_FROM = "timestamps.from"
    -  val TIMESTAMPS_CLASS = "timestamps.class"
    -  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
    -
    -  val WATERMARKS_TYPE = "watermarks.type"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
    -  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
    -  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
    -  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
    -  val WATERMARKS_CLASS = "watermarks.class"
    -  val WATERMARKS_SERIALIZED = "watermarks.serialized"
    -  val WATERMARKS_DELAY = "watermarks.delay"
    +  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
    +  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
    +  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
    +  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
    +  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
    --- End diff --
    
    `rowtime.timestamps.custom.serialized`


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171292456
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
    +              s"from source. Please specify the source field from which it can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    No problem @xccui. My goal was to allow users to specify all fields only once. Because users often have tables with 30+ columns. When I opened the PR I added a possibility to derive a `schema` from a `format` schema. But according to a SQL DDL statement `CREATE TABLE (..) [FORMAT] ...` the `schema` must be always complete and the `format` schema might be derived, so I changed my initial implementation.
    
    For simplicity `deriveFormatFields` removes the time attributes and takes the result schema as the format's schema, because `rowtime` must not be an existing field. If rowtime should be an existing field, the full format schema is mandatory (because `schema` and `format` schema might differ). I agree that we need good documentation for all of that.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170230962
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed mapping of properties under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    *                             schema.fields.1.name = test2
    +    *
    +    * The arity of the propertySets can differ.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedVariableProperties(
    +      key: String,
    +      propertySets: JList[JMap[String, String]])
    +    : Unit = {
    +    checkNotNull(key)
    +    checkNotNull(propertySets)
    +    putIndexedVariableProperties(key, propertySets.asScala.map(_.asScala.toMap))
    +  }
    +
       // ----------------------------------------------------------------------------------------------
     
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    */
       def getString(key: String): Option[String] = {
         properties.get(key)
       }
     
    -  def getCharacter(key: String): Option[Character] = getString(key) match {
    -    case Some(c) =>
    -      if (c.length != 1) {
    -        throw new ValidationException(s"The value of $key must only contain one character.")
    -      }
    -      Some(c.charAt(0))
    +  /**
    +    * Returns a string value under the given key if it exists.
    +    * This method is intended for Java code.
    +    */
    +  def getOptionalString(key: String): Optional[String] = toJava(getString(key))
     
    -    case None => None
    +  /**
    +    * Returns a character value under the given key if it exists.
    +    */
    +  def getCharacter(key: String): Option[Character] = getString(key).map { c =>
    +    if (c.length != 1) {
    +      throw new ValidationException(s"The value of $key must only contain one character.")
    +    }
    +    c.charAt(0)
       }
     
    -  def getBoolean(key: String): Option[Boolean] = getString(key) match {
    -    case Some(b) => Some(JBoolean.parseBoolean(b))
    -
    -    case None => None
    +  /**
    +    * Returns a class value under the given key if it exists.
    +    */
    +  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
    +    properties.get(key).map { name =>
    +      val clazz = try {
    +        Class.forName(
    +          name,
    +          true,
    +          Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
    +      } catch {
    +        case e: Exception =>
    +          throw new ValidationException(s"Coult not get class for key '$key'.", e)
    --- End diff --
    
    Add name of class?


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170175473
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala ---
    @@ -18,37 +18,85 @@
     
     package org.apache.flink.table.descriptors
     
    +import org.apache.flink.util.Preconditions
     import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +import scala.collection.JavaConverters._
     
     abstract class DescriptorTestBase {
     
       /**
    -    * Returns a valid descriptor.
    +    * Returns a set of valid descriptors.
    +    * This method is implemented in both Scala and Java.
    +    */
    +  def descriptors(): java.util.List[Descriptor]
    +
    +  /**
    +    * Returns a set of properties for each valid descriptor.
    +    * This code is implemented in both Scala and Java.
         */
    -  def descriptor(): Descriptor
    +  def properties(): java.util.List[java.util.Map[String, String]]
     
       /**
    -    * Returns a validator that can validate this descriptor.
    +    * Returns a validator that can validate all valid descriptors.
         */
       def validator(): DescriptorValidator
     
    -  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
    +  @Test
    +  def testValidation(): Unit = {
    +    val d = descriptors().asScala
    +    val p = properties().asScala
    +
    +    Preconditions.checkArgument(d.length == p.length)
    +
    +    d.zip(p).foreach { case (desc, props) =>
    +      verifyProperties(desc, props.asScala.toMap)
    +    }
    +  }
    +
    +  def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = {
         val normProps = new DescriptorProperties
         descriptor.addProperties(normProps)
    -    assertEquals(expected.toMap, normProps.asMap)
    +    assertEquals(expected, normProps.asScalaMap)
       }
     
    -  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
    +  def verifyInvalidProperty(
    +      descriptor: Descriptor,
    +      property: String,
    +      invalidValue: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafePut(property, invalidValue)
         validator().validate(properties)
       }
     
    -  def verifyMissingProperty(removeProperty: String): Unit = {
    +  def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = {
         val properties = new DescriptorProperties
    -    descriptor().addProperties(properties)
    +    descriptor.addProperties(properties)
         properties.unsafeRemove(removeProperty)
         validator().validate(properties)
       }
     }
    +
    +class TestTableSourceDescriptor(connector: ConnectorDescriptor)
    +  extends TableSourceDescriptor(connector) {
    +
    +  def addFormat(format: FormatDescriptor): TestTableSourceDescriptor = {
    +    this.formatDescriptor = Some(format)
    +    this
    +  }
    +
    +  def addSchema(schema: Schema): TestTableSourceDescriptor = {
    +    this.schemaDescriptor = Some(schema)
    +    this
    +  }
    +
    +  def getPropertyMap: java.util.Map[String, String] = {
    +    val props = new DescriptorProperties()
    +    connectorDescriptor.addProperties(props)
    --- End diff --
    
    `addProperties()` has been removed and this method `getPropertyMap()` seems to be useless now.


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170291034
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala ---
    @@ -18,48 +18,67 @@
     
     package org.apache.flink.table.descriptors
     
    +import java.util
    +
     import org.apache.flink.streaming.api.watermark.Watermark
     import org.apache.flink.table.api.ValidationException
     import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
     import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
     import org.apache.flink.types.Row
     import org.junit.Test
     
    -class RowtimeTest extends DescriptorTestBase {
    +import scala.collection.JavaConverters._
     
    -  @Test
    -  def testRowtime(): Unit = {
    -    val desc = Rowtime()
    -      .timestampsFromField("otherField")
    -      .watermarksPeriodicBounding(1000L)
    -    val expected = Seq(
    -      "rowtime.0.version" -> "1",
    -      "rowtime.0.timestamps.type" -> "from-field",
    -      "rowtime.0.timestamps.from" -> "otherField",
    -      "rowtime.0.watermarks.type" -> "periodic-bounding",
    -      "rowtime.0.watermarks.delay" -> "1000"
    -    )
    -    verifyProperties(desc, expected)
    -  }
    +class RowtimeTest extends DescriptorTestBase {
     
       @Test(expected = classOf[ValidationException])
       def testInvalidWatermarkType(): Unit = {
    -    verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
    +    verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", "xxx")
       }
     
       @Test(expected = classOf[ValidationException])
       def testMissingWatermarkClass(): Unit = {
    -    verifyMissingProperty("rowtime.0.watermarks.class")
    +    verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class")
    --- End diff --
    
    use constant


---

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r170230028
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala ---
    @@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
         }
       }
     
    +  /**
    +    * Adds an indexed sequence of properties (with sub-properties) under a common key.
    +    *
    +    * For example:
    +    *
    +    * schema.fields.0.type = INT, schema.fields.0.name = test
    +    * schema.fields.1.type = LONG, schema.fields.1.name = test2
    +    *
    +    * The arity of each propertyValue must match the arity of propertyKeys.
    +    *
    +    * This method is intended for Java code.
    +    */
    +  def putIndexedFixedProperties(
    --- End diff --
    
    Remove Scala equivalent?


---