You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/21 07:00:36 UTC

[GitHub] [flink] lincoln-lil commented on a diff in pull request #20177: [FLINK-28416][table] Add (Async)LookupFunction and providers in replace of (Async)TableFunction as the API for lookup table

lincoln-lil commented on code in PR #20177:
URL: https://github.com/apache/flink/pull/20177#discussion_r926295923


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncLookupFunction.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
+ * keys from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    /**
+     * Asynchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.
+     * @return A collection of all matching rows in the lookup table.
+     */
+    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
+
+    /** Invokes {@link #asyncLookup} and chains futures. */
+    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
+        asyncLookup(GenericRowData.of(keys))
+                .whenCompleteAsync(
+                        (result, exception) -> {
+                            if (exception != null) {
+                                future.completeExceptionally(exception);

Review Comment:
   It would be helpful to include the lookup key info (e.g., convert to strings) for users, because backend connectors may not offer any detailed exception when an async-timeout occurs.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java:
##########
@@ -36,8 +37,11 @@
  * similar to other {@link UserDefinedFunction}s. However, for convenience, in a {@link
  * LookupTableSource} the output type can simply be a {@link Row} or {@link RowData} in which case
  * the input and output types are derived from the table's schema with default conversion.
+ *
+ * @deprecated Please use {@link LookupFunctionProvider} to implement asynchronous lookup table.

Review Comment:
   LookupFunctionProvider -> AsyncLookupFunctionProvider



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncLookupFunction.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
+ * keys from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
+
+    /**
+     * Asynchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.

Review Comment:
   -> 'A {@link RowData} that wraps lookup keys'



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/AsyncLookupFunctionProvider.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+
+/** A provider for creating {@link AsyncLookupFunction}. */
+@PublicEvolving
+public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
+
+    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {

Review Comment:
   As a public interface, we should add some comments for the methods



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupFunctionProvider.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.LookupFunction;
+
+/** A provider for creating {@link LookupFunction}. */
+@PublicEvolving
+public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
+
+    static LookupFunctionProvider of(LookupFunction lookupFunction) {

Review Comment:
   add comments



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
+ * from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class LookupFunction extends TableFunction<RowData> {
+
+    /**
+     * Synchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.
+     * @return A collection of all matching rows in the lookup table.
+     */
+    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;
+
+    /** Invoke {@link #lookup} and handle exceptions. */
+    public final void eval(Object... keys) {
+        try {
+            lookup(GenericRowData.of(keys)).forEach(this::collect);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to lookup values with given key", e);

Review Comment:
   Although it differs from the `AsyncLookupFunction` here that all exceptions are actively reported by backend connectors, we still cannot be sure that they will provide details including the request key, so I tend to include the keys in the exception, WDYT?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
+ * from external system.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class LookupFunction extends TableFunction<RowData> {
+
+    /**
+     * Synchronously lookup rows matching the lookup keys.
+     *
+     * @param keyRow - A {@link RowData} that wraps keys to lookup.

Review Comment:
   -> 'A {@link RowData} that wraps lookup keys' 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org