You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/21 12:41:32 UTC

[GitHub] [flink] twalthr opened a new pull request #10917: [FLINK-15162][table] Refactor DataTypeLookup to DataTypeFactory

twalthr opened a new pull request #10917: [FLINK-15162][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917
 
 
   ## What is the purpose of the change
   
   Refactors the `DataTypeLookup` for future use. It is integrated into `CatalogManager` with an adapted instance creation order for table environments.
   
   ## Brief change log
   
   See commit messages.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369093203
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
 ##########
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.extraction.DataTypeExtractor;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.Optional;
+
+/**
+ * Factory for creating fully resolved data types that can be used for planning.
+ *
+ * <p>The factory is useful for types that cannot be created with one of the static methods in
+ * {@link DataTypes}) because they require access to configuration or catalog.
+ */
+@PublicEvolving
+public interface DataTypeFactory {
+
+	/**
+	 * Creates a type by a fully or partially defined name.
+	 *
+	 * <p>The factory will parse and resolve the name of a type to a {@link DataType}. This includes
+	 * both built-in types as well as user-defined types (see {@link DistinctType} and {@link StructuredType}).
+	 */
+	Optional<DataType> createDataType(String name);
+
+	/**
+	 * Creates a type by a fully or partially defined identifier.
+	 *
+	 * <p>The factory will parse and resolve the name of a type to a {@link DataType}. This includes
+	 * both built-in types as well as user-defined types (see {@link DistinctType} and {@link StructuredType}).
+	 */
+	Optional<DataType> createDataType(UnresolvedIdentifier identifier);
+
+	/**
+	 * Creates a type by analyzing the given class.
+	 *
+	 * <p>It does this by using Java reflection which can be supported by {@link DataTypeHint} annotations
+	 * for nested, structured types.
+	 *
+	 * <p>It will throw an {@link ValidationException} in cases where the reflective extraction needs
+	 * more information or simply fails.
+	 *
+	 * <p>The following examples show how to use and enrich the extraction process:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // returns INT
+	 *   createDataType(Integer.class)
+	 *
+	 *   // returns TIMESTAMP(9)
+	 *   createDataType(java.time.LocalDateTime.class)
+	 *
+	 *   // returns an anonymous, unregistered structured type
+	 *   // that is deeply integrated into the API compared to opaque RAW types
+	 *   class User {
+	 *
+	 *     // extract fields automatically
+	 *     public String name;
+	 *     public int age;
+	 *
+	 *     // enrich the extraction with precision information
+	 *     public @DataTypeHint("DECIMAL(10,2)") BigDecimal accountBalance;
+	 *
+	 *     // enrich the extraction with forcing using RAW types
+	 *     public @DataTypeHint(forceRawPattern = "scala.") Address address;
+	 *
+	 *     // enrich the extraction by specifying defaults
+	 *     public @DataTypeHint(defaultSecondPrecision = 3) Log log;
+	 *   }
+	 * }
+	 * </pre>
+	 */
+	default <T> DataType createDataType(Class<T> clazz) {
 
 Review comment:
   Do we need that default method? I don't think we will ever have a multiple implementations of that interface anyway, so one could implement everything here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145366641 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145366641) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145366641 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/145415438 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145366641) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542) 
   * 91d3d0f3e0960080b3be484372d873f65e54a079 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145415438) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145366641 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/145415438 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145366641) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542) 
   * 91d3d0f3e0960080b3be484372d873f65e54a079 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/145415438) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr closed pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369091797
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
 ##########
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.extraction.DataTypeExtractor;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.Optional;
+
+/**
+ * Factory for creating fully resolved data types that can be used for planning.
+ *
+ * <p>The factory is useful for types that cannot be created with one of the static methods in
+ * {@link DataTypes}) because they require access to configuration or catalog.
+ */
+@PublicEvolving
+public interface DataTypeFactory {
+
+	/**
+	 * Creates a type by a fully or partially defined name.
+	 *
+	 * <p>The factory will parse and resolve the name of a type to a {@link DataType}. This includes
+	 * both built-in types as well as user-defined types (see {@link DistinctType} and {@link StructuredType}).
+	 */
+	Optional<DataType> createDataType(String name);
+
+	/**
+	 * Creates a type by a fully or partially defined identifier.
+	 *
+	 * <p>The factory will parse and resolve the name of a type to a {@link DataType}. This includes
+	 * both built-in types as well as user-defined types (see {@link DistinctType} and {@link StructuredType}).
+	 */
+	Optional<DataType> createDataType(UnresolvedIdentifier identifier);
+
+	/**
+	 * Creates a type by analyzing the given class.
+	 *
+	 * <p>It does this by using Java reflection which can be supported by {@link DataTypeHint} annotations
+	 * for nested, structured types.
+	 *
+	 * <p>It will throw an {@link ValidationException} in cases where the reflective extraction needs
+	 * more information or simply fails.
+	 *
+	 * <p>The following examples show how to use and enrich the extraction process:
+	 *
+	 * <pre>
+	 * {@code
+	 *   // returns INT
+	 *   createDataType(Integer.class)
+	 *
+	 *   // returns TIMESTAMP(9)
+	 *   createDataType(java.time.LocalDateTime.class)
+	 *
+	 *   // returns an anonymous, unregistered structured type
+	 *   // that is deeply integrated into the API compared to opaque RAW types
+	 *   class User {
+	 *
+	 *     // extract fields automatically
+	 *     public String name;
+	 *     public int age;
+	 *
+	 *     // enrich the extraction with precision information
+	 *     public @DataTypeHint("DECIMAL(10,2)") BigDecimal accountBalance;
+	 *
+	 *     // enrich the extraction with forcing using RAW types
+	 *     public @DataTypeHint(forceRawPattern = "scala.") Address address;
+	 *
+	 *     // enrich the extraction by specifying defaults
+	 *     public @DataTypeHint(defaultSecondPrecision = 3) Log log;
+	 *   }
+	 * }
+	 * </pre>
+	 */
+	default <T> DataType createDataType(Class<T> clazz) {
+		return DataTypeExtractor.extractFromType(this, clazz);
+	}
+
+	/**
+	 * Creates a RAW type for the given class. This type is a black box within the table ecosystem
+	 * and is only deserialized at the edges.
+	 *
+	 * <p>The factory will create {@link DataTypes#RAW(Class, TypeSerializer)} in cases where no serializer
 
 Review comment:
   This comment is a bit weird imo. Won't it always be a generic serializer? If not how is it different from `createDataType` (just from the interface perspective)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-577057666
 
 
   Thank you @dawidwys. Merging...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145366641 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145366641) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542) 
   * 91d3d0f3e0960080b3be484372d873f65e54a079 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369133845
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 ##########
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNestedRoot;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Implementation of a {@link DataTypeFactory}.
+ */
+@Internal
+final class DataTypeFactoryImpl implements DataTypeFactory {
+
+	private final ClassLoader classLoader;
+
+	private final Supplier<ExecutionConfig> executionConfig;
+
+	DataTypeFactoryImpl(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			@Nullable ExecutionConfig executionConfig) {
+		this.classLoader = classLoader;
+		this.executionConfig = createKryoExecutionConfig(classLoader, config, executionConfig);
+	}
+
+	@Override
+	public Optional<DataType> createDataType(String name) {
+		final LogicalType parsedType = LogicalTypeParser.parse(name, classLoader);
+		if (hasNestedRoot(parsedType, LogicalTypeRoot.UNRESOLVED)) {
+			throw new TableException("User-defined types are not supported yet.");
+		}
+		return Optional.of(fromLogicalToDataType(parsedType));
+	}
+
+	@Override
+	public Optional<DataType> createDataType(UnresolvedIdentifier identifier) {
+		if (!identifier.getDatabaseName().isPresent()) {
+			return createDataType(identifier.getObjectName());
 
 Review comment:
   The class will get access to catalog manager in the future

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576804884
 
 
   Thanks for the review @dawidwys. I addressed your feedback. I also added one more commit that finializes this issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369118164
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##########
 @@ -68,19 +72,88 @@
 	// The name of the built-in catalog
 	private final String builtInCatalogName;
 
-	public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) {
+	private final DataTypeFactory typeFactory;
+
+	private CatalogManager(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			String defaultCatalogName,
+			Catalog defaultCatalog,
+			@Nullable ExecutionConfig executionConfig) {
+		checkNotNull(classLoader, "Class loader cannot be null");
+		checkNotNull(config, "Config cannot be null");
 		checkArgument(
 			!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
 			"Default catalog name cannot be null or empty");
 		checkNotNull(defaultCatalog, "Default catalog cannot be null");
+
 		catalogs = new LinkedHashMap<>();
 		catalogs.put(defaultCatalogName, defaultCatalog);
-		this.currentCatalogName = defaultCatalogName;
-		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+		currentCatalogName = defaultCatalogName;
+		currentDatabaseName = defaultCatalog.getDefaultDatabase();
 
-		this.temporaryTables = new HashMap<>();
+		temporaryTables = new HashMap<>();
 		// right now the default catalog is always the built-in one
-		this.builtInCatalogName = defaultCatalogName;
+		builtInCatalogName = defaultCatalogName;
+
+		typeFactory = new DataTypeFactoryImpl(classLoader, config, executionConfig);
 
 Review comment:
   This is already prepared for looking up types in the future which requires access to the catalog manager itself. But we can also postpone this until we really need it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369101893
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 ##########
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNestedRoot;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Implementation of a {@link DataTypeFactory}.
+ */
+@Internal
+final class DataTypeFactoryImpl implements DataTypeFactory {
+
+	private final ClassLoader classLoader;
+
+	private final Supplier<ExecutionConfig> executionConfig;
+
+	DataTypeFactoryImpl(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			@Nullable ExecutionConfig executionConfig) {
+		this.classLoader = classLoader;
+		this.executionConfig = createKryoExecutionConfig(classLoader, config, executionConfig);
+	}
+
+	@Override
+	public Optional<DataType> createDataType(String name) {
+		final LogicalType parsedType = LogicalTypeParser.parse(name, classLoader);
+		if (hasNestedRoot(parsedType, LogicalTypeRoot.UNRESOLVED)) {
 
 Review comment:
   I guess this is a very short term solution. We could think of having a method pretty much identical to `fromLogicalToDataType` that would accept a `DataTypeFactory` and fallback to `createDataType(UnresolvedIdentifier identifier)` for `UnresolvedUserDefinedType`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145366641 TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/145415438 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   Hash:91d3d0f3e0960080b3be484372d873f65e54a079 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546 TriggerType:PUSH TriggerID:91d3d0f3e0960080b3be484372d873f65e54a079
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145366641) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4542) 
   * 91d3d0f3e0960080b3be484372d873f65e54a079 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/145415438) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4546) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369102950
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 ##########
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNestedRoot;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Implementation of a {@link DataTypeFactory}.
+ */
+@Internal
+final class DataTypeFactoryImpl implements DataTypeFactory {
+
+	private final ClassLoader classLoader;
+
+	private final Supplier<ExecutionConfig> executionConfig;
+
+	DataTypeFactoryImpl(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			@Nullable ExecutionConfig executionConfig) {
+		this.classLoader = classLoader;
+		this.executionConfig = createKryoExecutionConfig(classLoader, config, executionConfig);
+	}
+
+	@Override
+	public Optional<DataType> createDataType(String name) {
+		final LogicalType parsedType = LogicalTypeParser.parse(name, classLoader);
+		if (hasNestedRoot(parsedType, LogicalTypeRoot.UNRESOLVED)) {
+			throw new TableException("User-defined types are not supported yet.");
+		}
+		return Optional.of(fromLogicalToDataType(parsedType));
+	}
+
+	@Override
+	public Optional<DataType> createDataType(UnresolvedIdentifier identifier) {
+		if (!identifier.getDatabaseName().isPresent()) {
+			return createDataType(identifier.getObjectName());
 
 Review comment:
   Have you thought how you want to pass `currentCatalog` & `currentDatabase` for the identifier resolution?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10917: [FLINK-15162][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10917: [FLINK-15162][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576667725
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1acae85438c68f81f5b8ff6b1f8904bb3a055c42 (Tue Jan 21 12:53:23 UTC 2020)
   
   **Warnings:**
    * **2 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-15162).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369083199
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##########
 @@ -68,19 +72,88 @@
 	// The name of the built-in catalog
 	private final String builtInCatalogName;
 
-	public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) {
+	private final DataTypeFactory typeFactory;
+
+	private CatalogManager(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			String defaultCatalogName,
+			Catalog defaultCatalog,
+			@Nullable ExecutionConfig executionConfig) {
+		checkNotNull(classLoader, "Class loader cannot be null");
+		checkNotNull(config, "Config cannot be null");
 		checkArgument(
 			!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
 			"Default catalog name cannot be null or empty");
 		checkNotNull(defaultCatalog, "Default catalog cannot be null");
+
 		catalogs = new LinkedHashMap<>();
 		catalogs.put(defaultCatalogName, defaultCatalog);
-		this.currentCatalogName = defaultCatalogName;
-		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+		currentCatalogName = defaultCatalogName;
+		currentDatabaseName = defaultCatalog.getDefaultDatabase();
 
-		this.temporaryTables = new HashMap<>();
+		temporaryTables = new HashMap<>();
 		// right now the default catalog is always the built-in one
-		this.builtInCatalogName = defaultCatalogName;
+		builtInCatalogName = defaultCatalogName;
+
+		typeFactory = new DataTypeFactoryImpl(classLoader, config, executionConfig);
 
 Review comment:
   How about we pass the `DataTypeFactory` instead of `config` & `executionConfig`. It would make it easier to possibly mock the `DataTypeFactory`.
   
   We can construct this default implementation in the builder.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369133845
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 ##########
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNestedRoot;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Implementation of a {@link DataTypeFactory}.
+ */
+@Internal
+final class DataTypeFactoryImpl implements DataTypeFactory {
+
+	private final ClassLoader classLoader;
+
+	private final Supplier<ExecutionConfig> executionConfig;
+
+	DataTypeFactoryImpl(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			@Nullable ExecutionConfig executionConfig) {
+		this.classLoader = classLoader;
+		this.executionConfig = createKryoExecutionConfig(classLoader, config, executionConfig);
+	}
+
+	@Override
+	public Optional<DataType> createDataType(String name) {
+		final LogicalType parsedType = LogicalTypeParser.parse(name, classLoader);
+		if (hasNestedRoot(parsedType, LogicalTypeRoot.UNRESOLVED)) {
+			throw new TableException("User-defined types are not supported yet.");
+		}
+		return Optional.of(fromLogicalToDataType(parsedType));
+	}
+
+	@Override
+	public Optional<DataType> createDataType(UnresolvedIdentifier identifier) {
+		if (!identifier.getDatabaseName().isPresent()) {
+			return createDataType(identifier.getObjectName());
 
 Review comment:
   The class will get access to catalog manager in the future

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#issuecomment-576740045
 
 
   <!--
   Meta data
   Hash:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb
   -->
   ## CI report:
   
   * 9bc8b54c95312ccc38679c5b06a66f0d8c6f43eb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369134883
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java
 ##########
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasNestedRoot;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Implementation of a {@link DataTypeFactory}.
+ */
+@Internal
+final class DataTypeFactoryImpl implements DataTypeFactory {
+
+	private final ClassLoader classLoader;
+
+	private final Supplier<ExecutionConfig> executionConfig;
+
+	DataTypeFactoryImpl(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			@Nullable ExecutionConfig executionConfig) {
+		this.classLoader = classLoader;
+		this.executionConfig = createKryoExecutionConfig(classLoader, config, executionConfig);
+	}
+
+	@Override
+	public Optional<DataType> createDataType(String name) {
+		final LogicalType parsedType = LogicalTypeParser.parse(name, classLoader);
+		if (hasNestedRoot(parsedType, LogicalTypeRoot.UNRESOLVED)) {
+			throw new TableException("User-defined types are not supported yet.");
+		}
+		return Optional.of(fromLogicalToDataType(parsedType));
+	}
+
+	@Override
+	public Optional<DataType> createDataType(UnresolvedIdentifier identifier) {
+		if (!identifier.getDatabaseName().isPresent()) {
+			return createDataType(identifier.getObjectName());
 
 Review comment:
   The class will get access to catalog manager. It is basically only a component of catalog manager.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #10917: [FLINK-15612][table] Refactor DataTypeLookup to DataTypeFactory
URL: https://github.com/apache/flink/pull/10917#discussion_r369118164
 
 

 ##########
 File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##########
 @@ -68,19 +72,88 @@
 	// The name of the built-in catalog
 	private final String builtInCatalogName;
 
-	public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) {
+	private final DataTypeFactory typeFactory;
+
+	private CatalogManager(
+			ClassLoader classLoader,
+			ReadableConfig config,
+			String defaultCatalogName,
+			Catalog defaultCatalog,
+			@Nullable ExecutionConfig executionConfig) {
+		checkNotNull(classLoader, "Class loader cannot be null");
+		checkNotNull(config, "Config cannot be null");
 		checkArgument(
 			!StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
 			"Default catalog name cannot be null or empty");
 		checkNotNull(defaultCatalog, "Default catalog cannot be null");
+
 		catalogs = new LinkedHashMap<>();
 		catalogs.put(defaultCatalogName, defaultCatalog);
-		this.currentCatalogName = defaultCatalogName;
-		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+		currentCatalogName = defaultCatalogName;
+		currentDatabaseName = defaultCatalog.getDefaultDatabase();
 
-		this.temporaryTables = new HashMap<>();
+		temporaryTables = new HashMap<>();
 		// right now the default catalog is always the built-in one
-		this.builtInCatalogName = defaultCatalogName;
+		builtInCatalogName = defaultCatalogName;
+
+		typeFactory = new DataTypeFactoryImpl(classLoader, config, executionConfig);
 
 Review comment:
   This is already prepared for looking up types in the future which requires access to the catalog manager itself. But we can also postpone this until we really need it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services