You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "Alibaba-HZY (via GitHub)" <gi...@apache.org> on 2023/03/30 07:35:36 UTC

[GitHub] [incubator-paimon] Alibaba-HZY opened a new pull request, #771: LogStoreTableFactory should not implements DynamicTableFactory

Alibaba-HZY opened a new pull request, #771:
URL: https://github.com/apache/incubator-paimon/pull/771

   Purpose
   LogStoreTableFactory should not implements DynamicTableFactory.if there is no kafka jar in flink/lib, the kafka identifier will find KafkaLogStoreFactory in paimon, and throw KafkaLogStoreFactory is not a DynamicTableSourceFactory exception, which is very confused.so we define the PaimonFactory ,LogStoreTableFactory implements it.
   if there is no kafka jar in flink/lib, it will throw org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath
    Fix #705
   
   Tests
   LogSystemITCase
   
   API and Format
   Documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1153310157


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/PaimonFactory.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from a list of key-value
+ * pairs in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI).
+ * Classes that implement this interface can be added to {@code
+ * META_INF/services/org.apache.paimon.flink.factories.PaimonFactory} in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This information will not be
+ * used during discovery but is helpful when generating documentation and performing validation. A
+ * factory may discover further (nested) factories, the options of the nested factories must not be
+ * declared in the sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
+ *
+ * <ul>
+ *   <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an
+ *       example.
+ *   <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to
+ *       split words.
+ *   <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define
+ *       such a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
+ *   <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do
+ *       {@code sink.partitioner} instead of {@code sink.sink-partitioner}) to <b>keep the keys
+ *       short</b>.
+ *   <li>Key names which can be templated, e.g. to refer to a specific column, should be listed
+ *       using '#' as the placeholder symbol. For example, use {@code fields.#.min}.
+ * </ul>
+ */
+public interface PaimonFactory {
+
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String factoryIdentifier();
+
+    /**
+     * Returns a set of {@link ConfigOption} that an implementation of this factory requires in
+     * addition to {@link #optionalOptions()}.
+     *
+     * <p>See the documentation of {@link PaimonFactory} for more information.
+     */
+    Set<ConfigOption<?>> requiredOptions();
+
+    /**
+     * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in
+     * addition to {@link #requiredOptions()}.
+     *
+     * <p>See the documentation of {@link PaimonFactory} for more information.
+     */
+    Set<ConfigOption<?>> optionalOptions();

Review Comment:
   useless too?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/PaimonFactory.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from a list of key-value
+ * pairs in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI).
+ * Classes that implement this interface can be added to {@code
+ * META_INF/services/org.apache.paimon.flink.factories.PaimonFactory} in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This information will not be
+ * used during discovery but is helpful when generating documentation and performing validation. A
+ * factory may discover further (nested) factories, the options of the nested factories must not be
+ * declared in the sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
+ *
+ * <ul>
+ *   <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an
+ *       example.
+ *   <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to
+ *       split words.
+ *   <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define
+ *       such a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
+ *   <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do
+ *       {@code sink.partitioner} instead of {@code sink.sink-partitioner}) to <b>keep the keys
+ *       short</b>.
+ *   <li>Key names which can be templated, e.g. to refer to a specific column, should be listed
+ *       using '#' as the placeholder symbol. For example, use {@code fields.#.min}.
+ * </ul>
+ */
+public interface PaimonFactory {
+
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String factoryIdentifier();
+
+    /**
+     * Returns a set of {@link ConfigOption} that an implementation of this factory requires in
+     * addition to {@link #optionalOptions()}.
+     *
+     * <p>See the documentation of {@link PaimonFactory} for more information.
+     */
+    Set<ConfigOption<?>> requiredOptions();

Review Comment:
   useless too?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java:
##########
@@ -50,7 +52,7 @@
  * <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
  * writing.
  */
-public interface LogStoreTableFactory extends DynamicTableFactory {
+public interface LogStoreTableFactory extends DynamicTablePaimonFactory {

Review Comment:
   Maybe we don't need `DynamicTablePaimonFactory` and `PaimonFactory`? Just a `LogStoreTableFactory` with `String factoryIdentifier()` is OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1154174589


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =

Review Comment:
   Useless



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =
+            ConfigOptions.key("property-version")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Version of the overall property design. This option is meant for future backwards compatibility.");
+
+    public static final ConfigOption<String> CONNECTOR =
+            ConfigOptions.key("connector")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Uniquely identifies the connector of a dynamic table that is used for accessing data in "
+                                    + "an external system. Its value is used during table source and table sink discovery.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a suitable format factory.");
+
+    /**
+     * Suffix for keys of {@link ConfigOption} in case a connector requires multiple formats (e.g.
+     * for both key and value).
+     *
+     * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory, Context)} Context)} for
+     * more information.
+     */
+    public static final String FORMAT_SUFFIX = ".format";
+
+    /**
+     * Creates a utility that helps in discovering formats, merging options with {@link
+     * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them all for a {@link
+     * LogStoreTableFactory}.
+     *
+     * <p>The following example sketches the usage:
+     *
+     * <pre>{@code
+     * // in createDynamicTableSource()
+     * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
+     *
+     * keyFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+     * valueFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+     *
+     * helper.validate();
+     *
+     * ... // construct connector with discovered formats
+     * }</pre>
+     *
+     * <p>Note: The format option parameter of {@link

Review Comment:
   We don't need such complicated documentation.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =
+            ConfigOptions.key("property-version")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Version of the overall property design. This option is meant for future backwards compatibility.");
+
+    public static final ConfigOption<String> CONNECTOR =

Review Comment:
   Useless



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1153382991


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java:
##########
@@ -50,7 +52,7 @@
  * <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
  * writing.
  */
-public interface LogStoreTableFactory extends DynamicTableFactory {
+public interface LogStoreTableFactory extends DynamicTablePaimonFactory {

Review Comment:
   yes we only need  String factoryIdentifier() ;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#issuecomment-1489920180

   > Thanks @Alibaba-HZY for the contribution.
   > 
   > I think we need to do a major surgery to delete lots of useless codes. We just use a very simple factory and its util.
   
   PaimonFactoryUtil only contains discoverPaimonFactory to find some LogStoreTableFactory implements PaimonFactory.
   FlinkFactoryUtil is copied from org.apache.flink.table.factories.FactoryUtil  to find DecodingFormat and EncodingFormat  because we use PaimonFactory so we need rewrite FactoryHelper and FactoryUtil


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi merged pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] golden-yang commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "golden-yang (via GitHub)" <gi...@apache.org>.
golden-yang commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1155319201


##########
docs/content/concepts/external-log-systems.md:
##########
@@ -42,6 +42,23 @@ If `'log.consistency' = 'eventual'` is set, in order to achieve correct results,
 
 ### Kafka
 
+#### Preparing flink-sql-connector-kafka Jar File

Review Comment:
   Very timely addition, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1152870665


##########
docs/content/concepts/external-log-systems.md:
##########
@@ -42,6 +42,23 @@ If `'log.consistency' = 'eventual'` is set, in order to achieve correct results,
 
 ### Kafka
 
+#### Preparing flink-sql-connector-kafka Jar File
+
+Paimon currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.
+
+Download the flink-sql-connector-kafka jar file with corresponding version.
+
+{{< stable >}}
+
+| Version | Jar                                                                                                                                                             |
+|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 1.16 | [flink-connector-kafka-1.16.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.0.jar) |

Review Comment:
   No `-sql-`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1152895275


##########
docs/content/concepts/external-log-systems.md:
##########
@@ -42,6 +42,23 @@ If `'log.consistency' = 'eventual'` is set, in order to achieve correct results,
 
 ### Kafka
 
+#### Preparing flink-sql-connector-kafka Jar File
+
+Paimon currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest Flink version for a better experience.
+
+Download the flink-sql-connector-kafka jar file with corresponding version.
+
+{{< stable >}}
+
+| Version | Jar                                                                                                                                                             |
+|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 1.16 | [flink-connector-kafka-1.16.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.0.jar) |

Review Comment:
   I left -sql-, i will make it up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#issuecomment-1489901554

   > I think we need to do a major surgery to delete lots of useless codes.
   > We just use a very simple factory and its util.
   
   ok i will delete useless codes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1155065870


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    /**
+     * Describes the property version. This can be used for backwards compatibility in case the
+     * property format changes.
+     */
+    public static final ConfigOption<Integer> PROPERTY_VERSION =
+            ConfigOptions.key("property-version")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Version of the overall property design. This option is meant for future backwards compatibility.");
+
+    public static final ConfigOption<String> CONNECTOR =
+            ConfigOptions.key("connector")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Uniquely identifies the connector of a dynamic table that is used for accessing data in "
+                                    + "an external system. Its value is used during table source and table sink discovery.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a suitable format factory.");
+
+    /**
+     * Suffix for keys of {@link ConfigOption} in case a connector requires multiple formats (e.g.
+     * for both key and value).
+     *
+     * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory, Context)} Context)} for
+     * more information.
+     */
+    public static final String FORMAT_SUFFIX = ".format";
+
+    /**
+     * Creates a utility that helps in discovering formats, merging options with {@link
+     * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them all for a {@link
+     * LogStoreTableFactory}.
+     *
+     * <p>The following example sketches the usage:
+     *
+     * <pre>{@code
+     * // in createDynamicTableSource()
+     * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
+     *
+     * keyFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+     * valueFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+     *
+     * helper.validate();
+     *
+     * ... // construct connector with discovered formats
+     * }</pre>
+     *
+     * <p>Note: The format option parameter of {@link

Review Comment:
   i have removd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] Alibaba-HZY commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "Alibaba-HZY (via GitHub)" <gi...@apache.org>.
Alibaba-HZY commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1153105479


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/DynamicTablePaimonFactory.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.factories.FormatFactory;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external Log Systems from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and
+ * unbounded data in a unified fashion.
+ */
+public interface DynamicTablePaimonFactory extends PaimonFactory {
+
+    /**
+     * Returns a set of {@link ConfigOption} that are directly forwarded to the runtime
+     * implementation but don't affect the final execution topology.
+     *
+     * <p>Options declared here can override options of the persisted plan during an enrichment
+     * phase. Since a restored topology is static, an implementer has to ensure that the declared
+     * options don't affect fundamental abilities such as {@link SupportsProjectionPushDown} or
+     * {@link SupportsFilterPushDown}.
+     *
+     * <p>For example, given a database connector, if an option defines the connection timeout,
+     * changing this value does not affect the pipeline topology and can be allowed. However, an
+     * option that defines whether the connector supports {@link SupportsReadingMetadata} or not is
+     * not allowed. The planner might not react to changed abilities anymore.
+     *
+     * @see FormatFactory#forwardOptions()
+     */
+    default Set<ConfigOption<?>> forwardOptions() {

Review Comment:
   no  i have remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] JingsongLi commented on a diff in pull request #771: LogStoreTableFactory should not implements DynamicTableFactory

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #771:
URL: https://github.com/apache/incubator-paimon/pull/771#discussion_r1153075156


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/DynamicTablePaimonFactory.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.factories.FormatFactory;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external Log Systems from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and
+ * unbounded data in a unified fashion.
+ */
+public interface DynamicTablePaimonFactory extends PaimonFactory {
+
+    /**
+     * Returns a set of {@link ConfigOption} that are directly forwarded to the runtime
+     * implementation but don't affect the final execution topology.
+     *
+     * <p>Options declared here can override options of the persisted plan during an enrichment
+     * phase. Since a restored topology is static, an implementer has to ensure that the declared
+     * options don't affect fundamental abilities such as {@link SupportsProjectionPushDown} or
+     * {@link SupportsFilterPushDown}.
+     *
+     * <p>For example, given a database connector, if an option defines the connection timeout,
+     * changing this value does not affect the pipeline topology and can be allowed. However, an
+     * option that defines whether the connector supports {@link SupportsReadingMetadata} or not is
+     * not allowed. The planner might not react to changed abilities anymore.
+     *
+     * @see FormatFactory#forwardOptions()
+     */
+    default Set<ConfigOption<?>> forwardOptions() {

Review Comment:
   Is this useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org