You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/16 03:29:57 UTC

[GitHub] [iceberg] xwmr-max opened a new pull request, #6440: Flink: Support Look-up Function

xwmr-max opened a new pull request, #6440:
URL: https://github.com/apache/iceberg/pull/6440

   Currently, ice does not support look-up join. This PR provides the look-up join function to meet the requirements of basic join scenarios.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1159714627


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   Also vote to follow Hive Lookup System unless we have an efficient way to look up a key in iceberg. It'll be much slow if we do full scan for each  look up key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1115565162


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -226,4 +229,21 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+    String[] lookupKeys = new String[context.getKeys().length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      int[] innerKeyArr = context.getKeys()[i];
+      Preconditions.checkArgument(
+          innerKeyArr.length == 1, "Don't support nested lookup keys in iceberg now.");
+      lookupKeys[i] = schema.getFieldNames()[innerKeyArr[0]];
+    }
+
+    TableSchema projectedSchema = getProjectedSchema();
+    loader.open();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1187423924


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   @stevenzwu I have modified it, please help review it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1362302792

   > is it actually a good idea to use iceberg table as a LOOK UP JOIN candidate? will it be fast enough?
   
   It may work fine on small data sets. But without an index or other means to speed things up, it's not really fast enough when there's a lot of data. 
   Anyway, I think it is a good attempt, not all tables are big, it is very useful when we need a lookup on iceberg.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1192020043


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {

Review Comment:
   Do we need to support loading the latest data by partition, like hive does? 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1370003483

   lookup function is for lookup join in Flink [1]. I have the same question as @zinking . normally lookup functions fit better for point query storage systems (like JDBC). 
   
   let's discuss the two scenarios separately
   * small Iceberg table that can fit into memory comfortably using caching. In this case, cache should always be enabled. I don't see a reason where cache should be disabled.
   * large Iceberg table. would FLILP-204 [2] help?
   
   [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
   [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1118777801


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 1, true},
+      {"avro", 1, false},
+      {"avro", 2, true},
+      {"avro", 2, false},
+      {"orc", 1, true},
+      {"orc", 1, false},
+      {"orc", 2, true},
+      {"orc", 2, false},
+      {"parquet", 1, true},
+      {"parquet", 1, false},
+      {"parquet", 2, true},
+      {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkLookupFunction(String format, int parallelism, boolean partitioned) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = TEMPORARY_FOLDER.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    String tablePath = warehouse.concat("/test");
+
+    Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());
+
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment()
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism);
+
+    tableLoader = TableLoader.fromHadoopTable(tablePath);
+    tableLoader.open();
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private void initFlinkLookupFunction(Map<String, String> properties) {
+    String[] lookupKeys = {"id"};
+    ReadableConfig readableConfig = new Configuration();
+    flinkLookupFunction =
+        new FlinkLookupFunction(
+            SimpleDataUtil.FLINK_SCHEMA,
+            lookupKeys,
+            tableLoader,
+            properties,
+            -1L,
+            null,
+            readableConfig);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithNoCache() throws Exception {
+    Map<String, String> properties = Maps.newHashMap();
+    Integer[] key = new Integer[] {1};
+    initFlinkLookupFunction(properties);
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    env.execute("Test Iceberg DataStream");
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithCache() throws Exception {
+    Integer[] key = new Integer[] {1};
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("lookup-join-cache-size", "3");
+    properties.put("lookup-join-cache-ttl", "10000");
+    initFlinkLookupFunction(properties);
+    List<Row> rows1 = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    List<Row> rows2 =
+        Lists.newArrayList(Row.of(1, "hello2"), Row.of(2, "world2"), Row.of(3, "foo2"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+    insertTableValues(rows1);
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+
+    // Insert into table values "rows2", verify that the function of "cache" is in effect
+    insertTableValues(rows2);
+    flinkLookupFunction.eval(key);
+
+    Thread.sleep(12000);
+    List<RowData> arrayList = Lists.newArrayList();
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            arrayList.add(record);
+          }
+
+          @Override
+          public void close() {}
+        });
+
+    flinkLookupFunction.eval(key);
+    Assert.assertEquals("Collect data size should be 2", 2, arrayList.size());
+  }
+
+  private void insertTableValues(List<Row> rows) throws Exception {

Review Comment:
   I have replaced the test case by executing sql



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1159675291


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   This is a good question. Let me analyze their pros and cons:
   
   - Like jdbc Lookup System: When a key arrives, it queries the data in the system and caches it, so it does not take up  too much extra memory and other resources. Querying these systems with a key usually gets results quickly.
   - Like File/Hive Lookup System: They load the latest partition (streaming) or all partitions (batch) of data and refresh them regularly. There may be a lot of data in the partitions, so it requires extra memory and other resources. Querying these systems against a filter usually takes a long time to get the results.
   
   so there is a tradeoff here.
   
   Which one should we use to implement `Lookup`? The choice depends on the developer's usage case and the characteristics of Iceberg itself. Under what circumstances does the user enable lookup queries? I think most scenarios should be when they need to use Iceberg as a dimension table that doesn't change much. Moreover, Iceberg itself is more similar to the Hive\File system.
   
   Generally speaking, I am more inclined to vote for the second one.
   @stevenzwu what do you think about that?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1169014466


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   I think it ought to be the file/Hive lookup mechanism. JDBC lookup pattern is a bad fit for Iceberg. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1169432815


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   ok,I'm going to modify this PR to use the Hive lookup mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1407610685

   cc @pvary 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1360908332

   > is it actually a good idea to use iceberg table as a LOOK UP JOIN candidate? will it be fast enough?
   
   This look-up function only ensures a basic function, and it can only satisfy scenarios with a small amount of data. Related optimizations will be made later based on this, for example, secondary indexes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1159675291


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   This is a good question. Let me analyze their pros and cons:
   
   - Like jdbc Lookup System: When a key arrives, it queries the data in the system and caches it, so it does not take up  too much extra memory and other resources. Querying these systems with a key usually gets results quickly.
   - Like File/Hive Lookup System: They load the latest partition (streaming) or all partitions (batch) of data and refresh them regularly. There may be a lot of data in the partitions, so it requires extra memory and other resources.
   
   so there is a tradeoff here.
   
   Which one should we use to implement `Lookup`? The choice depends on the developer's usage case and the characteristics of Iceberg itself. Under what circumstances does the user enable lookup queries? I think most scenarios should be when they need to use Iceberg as a dimension table that doesn't change much. Moreover, Iceberg itself is more similar to the Hive\File system.
   
   Generally speaking, I am more inclined to vote for the second one.
   @stevenzwu what do you think about that?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1169432815


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   ok,I'm going to modify this PR to use the file/Hive lookup mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1364814116

   cc @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1092770061


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {

Review Comment:
   this method is too long. probably can be refactored.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -226,4 +229,21 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+    String[] lookupKeys = new String[context.getKeys().length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      int[] innerKeyArr = context.getKeys()[i];
+      Preconditions.checkArgument(
+          innerKeyArr.length == 1, "Don't support nested lookup keys in iceberg now.");
+      lookupKeys[i] = schema.getFieldNames()[innerKeyArr[0]];
+    }
+
+    TableSchema projectedSchema = getProjectedSchema();
+    loader.open();

Review Comment:
   this can be problematic regarding resource management. see more details in PR #6614 



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {

Review Comment:
   Can you provide an test example from Flink repo? trying to see what kind of test coverage is desirable.



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 1, true},
+      {"avro", 1, false},
+      {"avro", 2, true},
+      {"avro", 2, false},
+      {"orc", 1, true},
+      {"orc", 1, false},
+      {"orc", 2, true},
+      {"orc", 2, false},
+      {"parquet", 1, true},
+      {"parquet", 1, false},
+      {"parquet", 2, true},
+      {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkLookupFunction(String format, int parallelism, boolean partitioned) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = TEMPORARY_FOLDER.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    String tablePath = warehouse.concat("/test");
+
+    Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());
+
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment()
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism);
+
+    tableLoader = TableLoader.fromHadoopTable(tablePath);
+    tableLoader.open();
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private void initFlinkLookupFunction(Map<String, String> properties) {
+    String[] lookupKeys = {"id"};
+    ReadableConfig readableConfig = new Configuration();
+    flinkLookupFunction =
+        new FlinkLookupFunction(
+            SimpleDataUtil.FLINK_SCHEMA,
+            lookupKeys,
+            tableLoader,
+            properties,
+            -1L,
+            null,
+            readableConfig);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithNoCache() throws Exception {
+    Map<String, String> properties = Maps.newHashMap();
+    Integer[] key = new Integer[] {1};
+    initFlinkLookupFunction(properties);
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    env.execute("Test Iceberg DataStream");
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithCache() throws Exception {
+    Integer[] key = new Integer[] {1};
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("lookup-join-cache-size", "3");
+    properties.put("lookup-join-cache-ttl", "10000");
+    initFlinkLookupFunction(properties);
+    List<Row> rows1 = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    List<Row> rows2 =
+        Lists.newArrayList(Row.of(1, "hello2"), Row.of(2, "world2"), Row.of(3, "foo2"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+    insertTableValues(rows1);
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+
+    // Insert into table values "rows2", verify that the function of "cache" is in effect
+    insertTableValues(rows2);
+    flinkLookupFunction.eval(key);
+
+    Thread.sleep(12000);
+    List<RowData> arrayList = Lists.newArrayList();
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            arrayList.add(record);
+          }
+
+          @Override
+          public void close() {}
+        });
+
+    flinkLookupFunction.eval(key);
+    Assert.assertEquals("Collect data size should be 2", 2, arrayList.size());
+  }
+
+  private void insertTableValues(List<Row> rows) throws Exception {

Review Comment:
   if we just want to insert records into Iceberg table, this is probably much slower. we can use `GenericAppenderHelper`
   
   ```
       GenericAppenderHelper dataAppender =
           new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
   
       // snapshot0
       List<Record> batch0 =
           RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
       dataAppender.appendToTable(batch0);
   ```



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {

Review Comment:
   it is not necessary to test all the file format



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   I am not sure this implementation is appropriate. basically, for every key we are going to do a full table scan and filtering. This impl is more like `JdbcLookupFunction` where JDBC is a fast point lookup system.
   
   Have you looked into Flink `FileSystemLookupFunction`? that impl seems like a better fit. refresh the whole cache upon expiration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1488030849

   please review @szehon-ho 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1118774072


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   The FileSystemLookupFunction in Flink is also a point-query approach, except that the cache is handled by loading all data into memory and then expiration all data by configuring expiration parameters. In contrast, we use Caffine, which is excellent in dealing with cache. It contains many cache policies, which can well control the time of expired data and the size of cached data, respectively corresponding to the parameters of _lookup-join-cache-ttl_ and _lookup-join-cache-size_ in our function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1446393725

   @stevenzwu cc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1118775274


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1115564548


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -226,4 +229,21 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+    String[] lookupKeys = new String[context.getKeys().length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      int[] innerKeyArr = context.getKeys()[i];
+      Preconditions.checkArgument(
+          innerKeyArr.length == 1, "Don't support nested lookup keys in iceberg now.");
+      lookupKeys[i] = schema.getFieldNames()[innerKeyArr[0]];
+    }
+
+    TableSchema projectedSchema = getProjectedSchema();
+    loader.open();

Review Comment:
   > this can be problematic regarding resource management. see more details in PR #6614
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hulin-ola commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hulin-ola (via GitHub)" <gi...@apache.org>.
hulin-ola commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1162253295


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########


Review Comment:
   flink1.12 work or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1355946472

   cc @stevenzwu @hililiwei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zinking commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
zinking commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1358725441

   is it actually a good idea to use iceberg table as a LOOK UP JOIN candidate? will it be fast enough?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by GitBox <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1379970862

   > lookup function is for lookup join in Flink [1]. I have the same question as @zinking . normally lookup functions fit better for point query storage systems (like JDBC).
   > 
   > let's discuss the two scenarios separately
   > 
   > * small Iceberg table that can fit into memory comfortably using caching. In this case, cache should always be enabled. I don't see a reason where cache should be disabled. Also if a taskmanager has 8 slots, does lookup function cache 1 or 8 copies of reference data set?
   > * large Iceberg table. would FLILP-204 [2] help?
   > 
   > [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
   
   Hi stevenzwu. Thank you for your review. Let's discuss the problem you raised separately.
   
   - As you said, small iceberg table can be easily loaded into the memory by using cache, and the query performance is also very fast. Therefore, from this point on, cache may always be enabled. However, there are some circumstances to consider. First, in our solution, _lookup-join-cache-size_ and _lookup-join-cache-ttl_ are provided to control the cache size and expiration time respectively, so that the cache size can be set according to actual conditions and the queried data can be guaranteed to be the latest. Secondly, this scheme improves query efficiency by storing data with the same primary key in the cache. If the cache does not contain data with the same primary key, the latest data will be loaded from the table. In addition, if a taskmanager has 8 slots,lookup function needs to cache a copy of the data set. lookup function is just a basic function capability that can be used in the future to optimize enhanced performance, such as secondary indexes and so on. At present iceb
 erg does not support this basic function, which can satisfy the requirements of many scenarios.
   - FLILP-204 [2] just raises the cache hit ratio, user could use a hint to enable partitioned lookup join which enforces input of lookup join to hash shuffle by look up  keys.  This can indeed relieve the pressure of cache, but the iceberg table for larger data does not support it well. But based on the basic lookup function, we can apply this in the future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1115566364


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1118792114


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -226,4 +229,21 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+    String[] lookupKeys = new String[context.getKeys().length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      int[] innerKeyArr = context.getKeys()[i];
+      Preconditions.checkArgument(
+          innerKeyArr.length == 1, "Don't support nested lookup keys in iceberg now.");
+      lookupKeys[i] = schema.getFieldNames()[innerKeyArr[0]];
+    }
+
+    TableSchema projectedSchema = getProjectedSchema();
+    loader.open();

Review Comment:
   This problem does exist with `tableloader `at present. Therefore, I try to avoid this problem by calling `tableloader.open () `when I load the table. If this problem is to be solved completely, it needs to be solved fundamentally from `tableloader `and I will continue to pay attention to this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1118774818


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] xwmr-max commented on pull request #6440: Flink: Support Look-up Function

Posted by "xwmr-max (via GitHub)" <gi...@apache.org>.
xwmr-max commented on PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#issuecomment-1456106415

   cc @pvary 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1192020043


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {

Review Comment:
   Do we need to support loading the latest data by partition, like hive does?  @stevenzwu 
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1169015624


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########


Review Comment:
   nope. 1.12 support has deprecated. Iceberg Flink module only supports 3 versions of Flink. For the master branch, it is 1.15, 1.16, 1.17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1159675291


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   This is a good question. Let me analyze their pros and cons:
   
   - Like jdbc Lookup System: When a key arrives, it queries the data in the system and caches it, so it does not take up  too much extra memory and other resources.
   - Like File/Hive Lookup System: They load the latest partition (streaming) or all partitions (batch) of data and refresh them regularly. There may be a lot of data in the partitions, so it requires extra memory and other resources.
   
   so there is a tradeoff here.
   
   Which one should we use to implement `Lookup`? The choice depends on the developer's usage case and the characteristics of Iceberg itself. Under what circumstances does the user enable lookup queries? I think most scenarios should be when they need to use Iceberg as a dimension table that doesn't change much. Moreover, Iceberg itself is more similar to the Hive\File system.
   
   Generally speaking, I am more inclined to vote for the second one.
   @stevenzwu what do you think about that?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org