You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/06 07:17:40 UTC

[GitHub] [flink] PatrickRen opened a new pull request, #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

PatrickRen opened a new pull request, #20177:
URL: https://github.com/apache/flink/pull/20177

   ## What is the purpose of the change
   
   This pull request introduces new interfaces in replace of the original TableFunction and its provider for lookup table, including:
   - `LookupFunction`
   - `AsyncLookupFunction`
   - `LookupFunctionProvider`
   - `AsyncLookupFunctionProvider`
   
   Also mark `TableFunctionProvider` and `AsyncTableFunctionProvider` as deprecated.
   
   ## Brief change log
   
   - Add (Async)LookupFunction interface and provider interfaces
   - Mark (Async)TableFunctionProvider as deprecated
   
   
   ## Verifying this change
   
   This PR only creates new and marks deprecated interfaces, which does not require test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20177:
URL: https://github.com/apache/flink/pull/20177#issuecomment-1175876785

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "25ac1597d18be7de820ecfb8db9778df20946b6a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "25ac1597d18be7de820ecfb8db9778df20946b6a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 25ac1597d18be7de820ecfb8db9778df20946b6a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #20177:
URL: https://github.com/apache/flink/pull/20177#issuecomment-1194989800

   @lincoln-lil Thanks for the review! I've addressed your comments in the latest commit. Please take a look when you are available.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen merged pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

Posted by GitBox <gi...@apache.org>.
PatrickRen merged PR #20177:
URL: https://github.com/apache/flink/pull/20177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20177:
URL: https://github.com/apache/flink/pull/20177#discussion_r926295923


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncLookupFunction.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
+ * keys from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    /**
+     * Asynchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.
+     * @return A collection of all matching rows in the lookup table.
+     */
+    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
+
+    /** Invokes {@link #asyncLookup} and chains futures. */
+    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
+        asyncLookup(GenericRowData.of(keys))
+                .whenCompleteAsync(
+                        (result, exception) -> {
+                            if (exception != null) {
+                                future.completeExceptionally(exception);

Review Comment:
   It would be helpful to include the lookup key info (e.g., convert to strings) for users, because backend connectors may not offer any detailed exception when an async-timeout occurs.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java:
##########
@@ -36,8 +37,11 @@
  * similar to other {@link UserDefinedFunction}s. However, for convenience, in a {@link
  * LookupTableSource} the output type can simply be a {@link Row} or {@link RowData} in which case
  * the input and output types are derived from the table's schema with default conversion.
+ *
+ * @deprecated Please use {@link LookupFunctionProvider} to implement asynchronous lookup table.

Review Comment:
   LookupFunctionProvider -> AsyncLookupFunctionProvider



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncLookupFunction.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
+ * keys from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    /**
+     * Asynchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.

Review Comment:
   -> 'A {@link RowData} that wraps lookup keys'



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/AsyncLookupFunctionProvider.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+
+/** A provider for creating {@link AsyncLookupFunction}. */
+@PublicEvolving
+public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
+
+    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {

Review Comment:
   As a public interface, we should add some comments for the methods



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupFunctionProvider.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.LookupFunction;
+
+/** A provider for creating {@link LookupFunction}. */
+@PublicEvolving
+public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
+
+    static LookupFunctionProvider of(LookupFunction lookupFunction) {

Review Comment:
   add comments



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
+ * from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class LookupFunction extends TableFunction<RowData> {
+
+    /**
+     * Synchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.
+     * @return A collection of all matching rows in the lookup table.
+     */
+    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;
+
+    /** Invoke {@link #lookup} and handle exceptions. */
+    public final void eval(Object... keys) {
+        try {
+            lookup(GenericRowData.of(keys)).forEach(this::collect);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to lookup values with given key", e);

Review Comment:
   Although it differs from the `AsyncLookupFunction` here that all exceptions are actively reported by backend connectors, we still cannot be sure that they will provide details including the request key, so I tend to include the keys in the exception, WDYT?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
+ * from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class LookupFunction extends TableFunction<RowData> {
+
+    /**
+     * Synchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.

Review Comment:
   -> 'A {@link RowData} that wraps lookup keys' 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org