You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "hililiwei (via GitHub)" <gi...@apache.org> on 2023/04/06 11:29:36 UTC

[GitHub] [iceberg] hililiwei commented on a diff in pull request #6440: Flink: Support Look-up Function

hililiwei commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1159675291


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   This is a good question. Let me analyze their pros and cons:
   
   - Like jdbc Lookup System: When a key arrives, it queries the data in the system and caches it, so it does not take up  too much extra memory and other resources.
   - Like File/Hive Lookup System: They load the latest partition (streaming) or all partitions (batch) of data and refresh them regularly. There may be a lot of data in the partitions, so it requires extra memory and other resources.
   
   so there is a tradeoff here.
   
   Which one should we use to implement `Lookup`? The choice depends on the developer's usage case and the characteristics of Iceberg itself. Under what circumstances does the user enable lookup queries? I think most scenarios should be when they need to use Iceberg as a dimension table that doesn't change much. Moreover, Iceberg itself is more similar to the Hive\File system.
   
   Generally speaking, I am more inclined to vote for the second one.
   @stevenzwu what do you think about that?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org