You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 12:29:31 UTC

[bahir-flink] 10/16: fix: fix ExtractionUtils.extractionError,update lookup function

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit a0102dfeec4bc345a78e3454dc9da7e43fa2dbeb
Author: Shimin Huang <40...@users.noreply.github.com>
AuthorDate: Thu Jun 2 14:28:05 2022 +0800

    fix: fix ExtractionUtils.extractionError,update lookup function
---
 .../lookup/AbstractKuduLookupFunction.java         | 207 ---------------------
 .../function/lookup/KuduRowDataLookupFunction.java | 161 +++++++++++++++-
 2 files changed, 156 insertions(+), 212 deletions(-)

diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java
deleted file mode 100644
index ccb9687..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.table.function.lookup;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
-import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@link TableFunction} to query fields from Kudu by keys.
- * The query template like:
- * <PRE>
- * SELECT c, d, e, f from T where a = ? and b = ?
- * </PRE>
- *
- * <p>Support cache the result to avoid frequent accessing to remote databases.
- * 1.The cacheMaxSize is -1 means not use cache.
- * 2.For real-time data, you need to set the TTL of cache.
- *
- * @param <IN> Type of the input records
- */
-public abstract class AbstractKuduLookupFunction<IN> extends TableFunction<IN> {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduLookupFunction.class);
-    private static final long serialVersionUID = 1L;
-
-    private final KuduTableInfo tableInfo;
-    private final KuduReaderConfig kuduReaderConfig;
-    private final String[] keyNames;
-    private final String[] projectedFields;
-    private final long cacheMaxSize;
-    private final long cacheExpireMs;
-    private final int maxRetryTimes;
-    private final RowResultConvertor<IN> convertor;
-
-    private transient Cache<IN, List<IN>> cache;
-    private transient KuduReader<IN> kuduReader;
-    private transient Integer keyCount = 0;
-
-    public AbstractKuduLookupFunction(String[] keyNames, RowResultConvertor<IN> convertor, KuduTableInfo tableInfo,
-                                      KuduReaderConfig kuduReaderConfig, String[] projectedFields,
-                                      KuduLookupOptions kuduLookupOptions) {
-        this.tableInfo = tableInfo;
-        this.convertor = convertor;
-        this.projectedFields = projectedFields;
-        this.keyNames = keyNames;
-        this.kuduReaderConfig = kuduReaderConfig;
-        this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
-        this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
-        this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
-    }
-
-    /**
-     * Template method to build cache key
-     *
-     * @param keys join keys
-     * @return cache key
-     */
-    public abstract IN buildCacheKey(Object... keys);
-
-    /**
-     * invoke entry point of lookup function.
-     *
-     * @param keys join keys
-     */
-    public void eval(Object... keys) {
-        if (keys.length != keyNames.length) {
-            throw new RuntimeException("The join keys are of unequal lengths");
-        }
-        // cache key
-        IN keyRow = buildCacheKey(keys);
-        if (this.cache != null) {
-            ConcurrentMap<IN, List<IN>> cacheMap = this.cache.asMap();
-            this.keyCount = cacheMap.size();
-            List<IN> cacheRows = this.cache.getIfPresent(keyRow);
-            if (CollectionUtils.isNotEmpty(cacheRows)) {
-                for (IN cacheRow : cacheRows) {
-                    collect(cacheRow);
-                }
-                return;
-            }
-        }
-
-        for (int retry = 1; retry <= maxRetryTimes; retry++) {
-            try {
-                List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
-                this.kuduReader.setTableFilters(kuduFilterInfos);
-                KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
-                ArrayList<IN> rows = new ArrayList<>();
-                for (KuduInputSplit inputSplit : inputSplits) {
-                    KuduReaderIterator<IN> scanner = kuduReader.scanner(inputSplit.getScanToken());
-                    // 没有启用cache
-                    if (cache == null) {
-                        while (scanner.hasNext()) {
-                            collect(scanner.next());
-                        }
-                    } else {
-                        while (scanner.hasNext()) {
-                            IN row = scanner.next();
-                            rows.add(row);
-                            collect(row);
-                        }
-                        rows.trimToSize();
-                    }
-                }
-                if (cache != null) {
-                    cache.put(keyRow, rows);
-                }
-                break;
-            } catch (Exception e) {
-                LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
-                if (retry >= maxRetryTimes) {
-                    throw new RuntimeException("Execution of Kudu scan failed.", e);
-                }
-                try {
-                    Thread.sleep(1000L * retry);
-                } catch (InterruptedException e1) {
-                    throw new RuntimeException(e1);
-                }
-            }
-        }
-    }
-
-    /**
-     * build kuduFilterInfo
-     *
-     * @return kudu filters
-     */
-    private List<KuduFilterInfo> buildKuduFilterInfo(Object... keyValS) {
-        List<KuduFilterInfo> kuduFilterInfos = Lists.newArrayList();
-        for (int i = 0; i < keyNames.length; i++) {
-            KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create(keyNames[i])
-                    .equalTo(keyValS[i]).build();
-            kuduFilterInfos.add(kuduFilterInfo);
-        }
-        return kuduFilterInfos;
-    }
-
-
-    @Override
-    public void open(FunctionContext context) {
-        try {
-            super.open(context);
-            this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
-            // build kudu cache
-            this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
-                    Arrays.asList(projectedFields) : null);
-            this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
-                    .expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
-                    .maximumSize(this.cacheMaxSize)
-                    .build();
-        } catch (Exception ioe) {
-            LOG.error("Exception while creating connection to Kudu.", ioe);
-            throw new RuntimeException("Cannot create connection to Kudu.", ioe);
-        }
-    }
-
-    @Override
-    public void close() {
-        if (null != this.kuduReader) {
-            try {
-                this.kuduReader.close();
-                this.cache.cleanUp();
-                // help gc
-                this.cache = null;
-                this.kuduReader = null;
-            } catch (IOException e) {
-                // ignore exception when close.
-                LOG.warn("exception when close table", e);
-            }
-        }
-    }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index a1bf7d7..4a4a952 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -17,28 +17,178 @@
  */
 package org.apache.flink.connectors.kudu.table.function.lookup;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
 import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * LookupFunction based on the RowData object type
  */
-public class KuduRowDataLookupFunction extends AbstractKuduLookupFunction<RowData> {
+public class KuduRowDataLookupFunction extends TableFunction<RowData> {
     private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(KuduRowDataLookupFunction.class);
+
+    private final KuduTableInfo tableInfo;
+    private final KuduReaderConfig kuduReaderConfig;
+    private final String[] keyNames;
+    private final String[] projectedFields;
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private final RowResultConvertor<RowData> convertor;
 
+    private transient Cache<RowData, List<RowData>> cache;
+    private transient KuduReader<RowData> kuduReader;
 
-    private KuduRowDataLookupFunction(String[] keyNames, KuduTableInfo tableInfo, KuduReaderConfig kuduReaderConfig, String[] projectedFields, KuduLookupOptions kuduLookupOptions) {
-        super(keyNames, new RowResultRowDataConvertor(), tableInfo, kuduReaderConfig, projectedFields, kuduLookupOptions);
+    private KuduRowDataLookupFunction(String[] keyNames, KuduTableInfo tableInfo, KuduReaderConfig kuduReaderConfig,
+                                      String[] projectedFields, KuduLookupOptions kuduLookupOptions) {
+        this.tableInfo = tableInfo;
+        this.convertor = new RowResultRowDataConvertor();
+        this.projectedFields = projectedFields;
+        this.keyNames = keyNames;
+        this.kuduReaderConfig = kuduReaderConfig;
+        this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
     }
 
-    @Override
     public RowData buildCacheKey(Object... keys) {
         return GenericRowData.of(keys);
     }
 
+    /**
+     * invoke entry point of lookup function.
+     *
+     * @param keys join keys
+     */
+    public void eval(Object... keys) {
+        if (keys.length != keyNames.length) {
+            throw new RuntimeException("The join keys are of unequal lengths");
+        }
+        // cache key
+        RowData keyRow = buildCacheKey(keys);
+        if (this.cache != null) {
+            List<RowData> cacheRows = this.cache.getIfPresent(keyRow);
+            if (CollectionUtils.isNotEmpty(cacheRows)) {
+                for (RowData cacheRow : cacheRows) {
+                    collect(cacheRow);
+                }
+                return;
+            }
+        }
+
+        for (int retry = 1; retry <= maxRetryTimes; retry++) {
+            try {
+                List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
+                this.kuduReader.setTableFilters(kuduFilterInfos);
+                KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
+                ArrayList<RowData> rows = new ArrayList<>();
+                for (KuduInputSplit inputSplit : inputSplits) {
+                    KuduReaderIterator<RowData> scanner = kuduReader.scanner(inputSplit.getScanToken());
+                    // 没有启用cache
+                    if (cache == null) {
+                        while (scanner.hasNext()) {
+                            collect(scanner.next());
+                        }
+                    } else {
+                        while (scanner.hasNext()) {
+                            RowData row = scanner.next();
+                            rows.add(row);
+                            collect(row);
+                        }
+                        rows.trimToSize();
+                    }
+                }
+                if (cache != null) {
+                    cache.put(keyRow, rows);
+                }
+                break;
+            } catch (Exception e) {
+                LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
+                if (retry >= maxRetryTimes) {
+                    throw new RuntimeException("Execution of Kudu scan failed.", e);
+                }
+                try {
+                    Thread.sleep(1000L * retry);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+            }
+        }
+    }
+
+    /**
+     * build kuduFilterInfo
+     *
+     * @return kudu filters
+     */
+    private List<KuduFilterInfo> buildKuduFilterInfo(Object... keyValS) {
+        List<KuduFilterInfo> kuduFilterInfos = Lists.newArrayList();
+        for (int i = 0; i < keyNames.length; i++) {
+            KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create(keyNames[i])
+                    .equalTo(keyValS[i]).build();
+            kuduFilterInfos.add(kuduFilterInfo);
+        }
+        return kuduFilterInfos;
+    }
+
+
+    @Override
+    public void open(FunctionContext context) {
+        try {
+            super.open(context);
+            this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
+            // build kudu cache
+            this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
+                    Arrays.asList(projectedFields) : null);
+            this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+                    .expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
+                    .maximumSize(this.cacheMaxSize)
+                    .build();
+        } catch (Exception ioe) {
+            LOG.error("Exception while creating connection to Kudu.", ioe);
+            throw new RuntimeException("Cannot create connection to Kudu.", ioe);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (null != this.kuduReader) {
+            try {
+                this.kuduReader.close();
+                this.cache.cleanUp();
+                // help gc
+                this.cache = null;
+                this.kuduReader = null;
+            } catch (IOException e) {
+                // ignore exception when close.
+                LOG.warn("exception when close table", e);
+            }
+        }
+    }
+
     public static class Builder {
         private KuduTableInfo tableInfo;
         private KuduReaderConfig kuduReaderConfig;
@@ -76,7 +226,8 @@ public class KuduRowDataLookupFunction extends AbstractKuduLookupFunction<RowDat
         }
 
         public KuduRowDataLookupFunction build() {
-            return new KuduRowDataLookupFunction(keyNames, tableInfo, kuduReaderConfig, projectedFields, kuduLookupOptions);
+            return new KuduRowDataLookupFunction(keyNames, tableInfo, kuduReaderConfig, projectedFields,
+                    kuduLookupOptions);
         }
     }
 }