You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "stevenzwu (via GitHub)" <gi...@apache.org> on 2023/02/01 05:37:04 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6440: Flink: Support Look-up Function

stevenzwu commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1092770061


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {

Review Comment:
   this method is too long. probably can be refactored.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java:
##########
@@ -226,4 +229,21 @@ public DynamicTableSource copy() {
   public String asSummaryString() {
     return "Iceberg table source";
   }
+
+  @Override
+  public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+    String[] lookupKeys = new String[context.getKeys().length];
+    for (int i = 0; i < lookupKeys.length; i++) {
+      int[] innerKeyArr = context.getKeys()[i];
+      Preconditions.checkArgument(
+          innerKeyArr.length == 1, "Don't support nested lookup keys in iceberg now.");
+      lookupKeys[i] = schema.getFieldNames()[innerKeyArr[0]];
+    }
+
+    TableSchema projectedSchema = getProjectedSchema();
+    loader.open();

Review Comment:
   this can be problematic regarding resource management. see more details in PR #6614 



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {

Review Comment:
   Can you provide an test example from Flink repo? trying to see what kind of test coverage is desirable.



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {"avro", 1, true},
+      {"avro", 1, false},
+      {"avro", 2, true},
+      {"avro", 2, false},
+      {"orc", 1, true},
+      {"orc", 1, false},
+      {"orc", 2, true},
+      {"orc", 2, false},
+      {"parquet", 1, true},
+      {"parquet", 1, false},
+      {"parquet", 2, true},
+      {"parquet", 2, false}
+    };
+  }
+
+  public TestFlinkLookupFunction(String format, int parallelism, boolean partitioned) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+    this.parallelism = parallelism;
+    this.partitioned = partitioned;
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = TEMPORARY_FOLDER.newFolder();
+    String warehouse = folder.getAbsolutePath();
+
+    String tablePath = warehouse.concat("/test");
+
+    Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());
+
+    Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+    table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment()
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism);
+
+    tableLoader = TableLoader.fromHadoopTable(tablePath);
+    tableLoader.open();
+  }
+
+  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  private void initFlinkLookupFunction(Map<String, String> properties) {
+    String[] lookupKeys = {"id"};
+    ReadableConfig readableConfig = new Configuration();
+    flinkLookupFunction =
+        new FlinkLookupFunction(
+            SimpleDataUtil.FLINK_SCHEMA,
+            lookupKeys,
+            tableLoader,
+            properties,
+            -1L,
+            null,
+            readableConfig);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithNoCache() throws Exception {
+    Map<String, String> properties = Maps.newHashMap();
+    Integer[] key = new Integer[] {1};
+    initFlinkLookupFunction(properties);
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    FlinkSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .append();
+
+    env.execute("Test Iceberg DataStream");
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+  }
+
+  @Test
+  public void testFlinkLookupFunctionWithCache() throws Exception {
+    Integer[] key = new Integer[] {1};
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("lookup-join-cache-size", "3");
+    properties.put("lookup-join-cache-ttl", "10000");
+    initFlinkLookupFunction(properties);
+    List<Row> rows1 = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
+
+    List<Row> rows2 =
+        Lists.newArrayList(Row.of(1, "hello2"), Row.of(2, "world2"), Row.of(3, "foo2"));
+
+    RowData expectedRowData = SimpleDataUtil.createRowData(1, "hello");
+    insertTableValues(rows1);
+
+    flinkLookupFunction.open(null);
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'id' value should be '1'",
+                expectedRowData.getInt(0),
+                record.getInt(0));
+            Assert.assertEquals(
+                "Should join the table data by key correctly. "
+                    + "The joined 'data' value should be 'hello'\"",
+                expectedRowData.getString(1),
+                record.getString(1));
+          }
+
+          @Override
+          public void close() {}
+        });
+    flinkLookupFunction.eval(key);
+
+    // Insert into table values "rows2", verify that the function of "cache" is in effect
+    insertTableValues(rows2);
+    flinkLookupFunction.eval(key);
+
+    Thread.sleep(12000);
+    List<RowData> arrayList = Lists.newArrayList();
+    flinkLookupFunction.setCollector(
+        new Collector<RowData>() {
+          @Override
+          public void collect(RowData record) {
+            arrayList.add(record);
+          }
+
+          @Override
+          public void close() {}
+        });
+
+    flinkLookupFunction.eval(key);
+    Assert.assertEquals("Collect data size should be 2", 2, arrayList.size());
+  }
+
+  private void insertTableValues(List<Row> rows) throws Exception {

Review Comment:
   if we just want to insert records into Iceberg table, this is probably much slower. we can use `GenericAppenderHelper`
   
   ```
       GenericAppenderHelper dataAppender =
           new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);
   
       // snapshot0
       List<Record> batch0 =
           RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
       dataAppender.appendToTable(batch0);
   ```



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkLookupFunction.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkLookupFunction {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  private static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  private static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private TableLoader tableLoader;
+
+  private final FileFormat format;
+  private final int parallelism;
+  private final boolean partitioned;
+  private FlinkLookupFunction flinkLookupFunction;
+
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {

Review Comment:
   it is not necessary to test all the file format



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   I am not sure this implementation is appropriate. basically, for every key we are going to do a full table scan and filtering. This impl is more like `JdbcLookupFunction` where JDBC is a fast point lookup system.
   
   Have you looked into Flink `FileSystemLookupFunction`? that impl seems like a better fit. refresh the whole cache upon expiration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org