You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "xwmr-max (via GitHub)" <gi...@apache.org> on 2023/05/08 13:05:55 UTC

[GitHub] [iceberg] xwmr-max commented on a diff in pull request #6440: Flink: Support Look-up Function

xwmr-max commented on code in PR #6440:
URL: https://github.com/apache/iceberg/pull/6440#discussion_r1187423924


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkLookupFunction.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkLookupFunction extends TableFunction<RowData> {
+  private static final long serialVersionUID = -7248058804931465381L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkLookupFunction.class);
+  // the max number of retries before throwing exception, in case of failure to load the table into
+  // cache
+  private static final int MAX_RETRIES = 3;
+
+  private final String[] projectedFields;
+
+  private final DataType[] projectedTypes;
+
+  private final String[] lookupKeys;
+
+  private final long cacheMaxSize;
+
+  private final long cacheExpireMs;
+
+  private final int[] lookupCols;
+
+  private transient LoadingCache<Object, List<RowData>> cache;
+  // serializer to copy RowData
+  private transient TypeSerializer<RowData> serializer;
+  // converters to convert data from internal to external in order to generate keys for the cache
+  private final DataFormatConverter[] converters;
+
+  private FlinkInputFormat inputFormat;
+
+  private final ScanContext context;
+
+  private final Schema icebergSchema;
+
+  private final TableLoader tableLoader;
+
+  private final EncryptionManager encryption;
+
+  private final FileIO io;
+
+  private static final long MIN_RETRY_SLEEP_TIMEMS = 10000;
+
+  private static final long MAX_RETRY_SLEEP_TIMEMS = 600000;
+
+  private static final long MAX_RETRY_URATIONMS = 600000;
+
+  private static final double SCALE_FACTOR = 2.0;
+
+  private static final ExecutorService EXECUTOR_SERVICE =
+      ThreadPools.newScheduledPool("iceberg-thread-pool-" + FlinkLookupFunction.class, 3);
+
+  public FlinkLookupFunction(
+      TableSchema schema,
+      String[] lookupKeys,
+      TableLoader tableLoader,
+      Map<String, String> properties,
+      long limit,
+      List<Expression> filters,
+      ReadableConfig readableConfig) {
+    Preconditions.checkNotNull(schema, "Table schema can not be null.");
+    this.lookupCols = new int[lookupKeys.length];
+    this.converters = new DataFormatConverter[lookupKeys.length];
+    this.tableLoader = tableLoader;
+    this.io = tableLoader.loadTable().io();
+    this.icebergSchema = tableLoader.loadTable().schema();
+    this.encryption = tableLoader.loadTable().encryption();
+    this.context =
+        ScanContext.builder()
+            .resolveConfig(tableLoader.loadTable(), properties, readableConfig)
+            .project(icebergSchema)
+            .filters(filters)
+            .limit(limit)
+            .build();
+    this.projectedFields = schema.getFieldNames();
+    this.projectedTypes = schema.getFieldDataTypes();
+    this.lookupKeys = lookupKeys;
+    this.cacheMaxSize = context.cacheMaxSize();
+    this.cacheExpireMs = context.cacheExpireMs();
+
+    Map<String, Integer> nameToIndex =
+        IntStream.range(0, projectedFields.length)
+            .boxed()
+            .collect(Collectors.toMap(i -> projectedFields[i], i -> i));
+    for (int i = 0; i < lookupKeys.length; i++) {
+      Integer index = nameToIndex.get(lookupKeys[i]);
+      Preconditions.checkArgument(
+          index != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
+      converters[i] = DataFormatConverters.getConverterForDataType(projectedTypes[index]);
+      lookupCols[i] = index;
+    }
+  }
+
+  @Override
+  public void open(FunctionContext functionContext) throws Exception {
+    super.open(functionContext);
+    TypeInformation<RowData> rowDataTypeInfo =
+        InternalTypeInfo.ofFields(
+            Arrays.stream(projectedTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new),
+            projectedFields);
+    serializer = rowDataTypeInfo.createSerializer(new ExecutionConfig());
+    Caffeine<Object, Object> builder = Caffeine.newBuilder();
+    if (cacheMaxSize == -1 || cacheExpireMs == -1) {
+      builder.maximumSize(0);
+    } else {
+      builder.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(cacheMaxSize);
+    }
+
+    cache =
+        builder
+            .scheduler(Scheduler.systemScheduler())
+            .build(
+                new CacheLoader<Object, List<RowData>>() {
+                  @Override
+                  public @Nullable List<RowData> load(@NonNull Object keys) throws Exception {

Review Comment:
   @stevenzwu I have modified it, please help review it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org