You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2022/07/17 12:29:31 UTC
[bahir-flink] 10/16: fix: fix ExtractionUtils.extractionError,update lookup function
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit a0102dfeec4bc345a78e3454dc9da7e43fa2dbeb
Author: Shimin Huang <40...@users.noreply.github.com>
AuthorDate: Thu Jun 2 14:28:05 2022 +0800
fix: fix ExtractionUtils.extractionError,update lookup function
---
.../lookup/AbstractKuduLookupFunction.java | 207 ---------------------
.../function/lookup/KuduRowDataLookupFunction.java | 161 +++++++++++++++-
2 files changed, 156 insertions(+), 212 deletions(-)
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java
deleted file mode 100644
index ccb9687..0000000
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/AbstractKuduLookupFunction.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.kudu.table.function.lookup;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
-import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
-import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@link TableFunction} to query fields from Kudu by keys.
- * The query template like:
- * <PRE>
- * SELECT c, d, e, f from T where a = ? and b = ?
- * </PRE>
- *
- * <p>Support cache the result to avoid frequent accessing to remote databases.
- * 1.The cacheMaxSize is -1 means not use cache.
- * 2.For real-time data, you need to set the TTL of cache.
- *
- * @param <IN> Type of the input records
- */
-public abstract class AbstractKuduLookupFunction<IN> extends TableFunction<IN> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractKuduLookupFunction.class);
- private static final long serialVersionUID = 1L;
-
- private final KuduTableInfo tableInfo;
- private final KuduReaderConfig kuduReaderConfig;
- private final String[] keyNames;
- private final String[] projectedFields;
- private final long cacheMaxSize;
- private final long cacheExpireMs;
- private final int maxRetryTimes;
- private final RowResultConvertor<IN> convertor;
-
- private transient Cache<IN, List<IN>> cache;
- private transient KuduReader<IN> kuduReader;
- private transient Integer keyCount = 0;
-
- public AbstractKuduLookupFunction(String[] keyNames, RowResultConvertor<IN> convertor, KuduTableInfo tableInfo,
- KuduReaderConfig kuduReaderConfig, String[] projectedFields,
- KuduLookupOptions kuduLookupOptions) {
- this.tableInfo = tableInfo;
- this.convertor = convertor;
- this.projectedFields = projectedFields;
- this.keyNames = keyNames;
- this.kuduReaderConfig = kuduReaderConfig;
- this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
- this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
- this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
- }
-
- /**
- * Template method to build cache key
- *
- * @param keys join keys
- * @return cache key
- */
- public abstract IN buildCacheKey(Object... keys);
-
- /**
- * invoke entry point of lookup function.
- *
- * @param keys join keys
- */
- public void eval(Object... keys) {
- if (keys.length != keyNames.length) {
- throw new RuntimeException("The join keys are of unequal lengths");
- }
- // cache key
- IN keyRow = buildCacheKey(keys);
- if (this.cache != null) {
- ConcurrentMap<IN, List<IN>> cacheMap = this.cache.asMap();
- this.keyCount = cacheMap.size();
- List<IN> cacheRows = this.cache.getIfPresent(keyRow);
- if (CollectionUtils.isNotEmpty(cacheRows)) {
- for (IN cacheRow : cacheRows) {
- collect(cacheRow);
- }
- return;
- }
- }
-
- for (int retry = 1; retry <= maxRetryTimes; retry++) {
- try {
- List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
- this.kuduReader.setTableFilters(kuduFilterInfos);
- KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
- ArrayList<IN> rows = new ArrayList<>();
- for (KuduInputSplit inputSplit : inputSplits) {
- KuduReaderIterator<IN> scanner = kuduReader.scanner(inputSplit.getScanToken());
- // 没有启用cache
- if (cache == null) {
- while (scanner.hasNext()) {
- collect(scanner.next());
- }
- } else {
- while (scanner.hasNext()) {
- IN row = scanner.next();
- rows.add(row);
- collect(row);
- }
- rows.trimToSize();
- }
- }
- if (cache != null) {
- cache.put(keyRow, rows);
- }
- break;
- } catch (Exception e) {
- LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
- if (retry >= maxRetryTimes) {
- throw new RuntimeException("Execution of Kudu scan failed.", e);
- }
- try {
- Thread.sleep(1000L * retry);
- } catch (InterruptedException e1) {
- throw new RuntimeException(e1);
- }
- }
- }
- }
-
- /**
- * build kuduFilterInfo
- *
- * @return kudu filters
- */
- private List<KuduFilterInfo> buildKuduFilterInfo(Object... keyValS) {
- List<KuduFilterInfo> kuduFilterInfos = Lists.newArrayList();
- for (int i = 0; i < keyNames.length; i++) {
- KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create(keyNames[i])
- .equalTo(keyValS[i]).build();
- kuduFilterInfos.add(kuduFilterInfo);
- }
- return kuduFilterInfos;
- }
-
-
- @Override
- public void open(FunctionContext context) {
- try {
- super.open(context);
- this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
- // build kudu cache
- this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
- Arrays.asList(projectedFields) : null);
- this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
- .expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
- .maximumSize(this.cacheMaxSize)
- .build();
- } catch (Exception ioe) {
- LOG.error("Exception while creating connection to Kudu.", ioe);
- throw new RuntimeException("Cannot create connection to Kudu.", ioe);
- }
- }
-
- @Override
- public void close() {
- if (null != this.kuduReader) {
- try {
- this.kuduReader.close();
- this.cache.cleanUp();
- // help gc
- this.cache = null;
- this.kuduReader = null;
- } catch (IOException e) {
- // ignore exception when close.
- LOG.warn("exception when close table", e);
- }
- }
- }
-}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index a1bf7d7..4a4a952 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -17,28 +17,178 @@
*/
package org.apache.flink.connectors.kudu.table.function.lookup;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowDataConvertor;
+import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReader;
import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* LookupFunction based on the RowData object type
*/
-public class KuduRowDataLookupFunction extends AbstractKuduLookupFunction<RowData> {
+public class KuduRowDataLookupFunction extends TableFunction<RowData> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(KuduRowDataLookupFunction.class);
+
+ private final KuduTableInfo tableInfo;
+ private final KuduReaderConfig kuduReaderConfig;
+ private final String[] keyNames;
+ private final String[] projectedFields;
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private final RowResultConvertor<RowData> convertor;
+ private transient Cache<RowData, List<RowData>> cache;
+ private transient KuduReader<RowData> kuduReader;
- private KuduRowDataLookupFunction(String[] keyNames, KuduTableInfo tableInfo, KuduReaderConfig kuduReaderConfig, String[] projectedFields, KuduLookupOptions kuduLookupOptions) {
- super(keyNames, new RowResultRowDataConvertor(), tableInfo, kuduReaderConfig, projectedFields, kuduLookupOptions);
+ private KuduRowDataLookupFunction(String[] keyNames, KuduTableInfo tableInfo, KuduReaderConfig kuduReaderConfig,
+ String[] projectedFields, KuduLookupOptions kuduLookupOptions) {
+ this.tableInfo = tableInfo;
+ this.convertor = new RowResultRowDataConvertor();
+ this.projectedFields = projectedFields;
+ this.keyNames = keyNames;
+ this.kuduReaderConfig = kuduReaderConfig;
+ this.cacheMaxSize = kuduLookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = kuduLookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = kuduLookupOptions.getMaxRetryTimes();
}
- @Override
public RowData buildCacheKey(Object... keys) {
return GenericRowData.of(keys);
}
+ /**
+ * invoke entry point of lookup function.
+ *
+ * @param keys join keys
+ */
+ public void eval(Object... keys) {
+ if (keys.length != keyNames.length) {
+ throw new RuntimeException("The join keys are of unequal lengths");
+ }
+ // cache key
+ RowData keyRow = buildCacheKey(keys);
+ if (this.cache != null) {
+ List<RowData> cacheRows = this.cache.getIfPresent(keyRow);
+ if (CollectionUtils.isNotEmpty(cacheRows)) {
+ for (RowData cacheRow : cacheRows) {
+ collect(cacheRow);
+ }
+ return;
+ }
+ }
+
+ for (int retry = 1; retry <= maxRetryTimes; retry++) {
+ try {
+ List<KuduFilterInfo> kuduFilterInfos = buildKuduFilterInfo(keys);
+ this.kuduReader.setTableFilters(kuduFilterInfos);
+ KuduInputSplit[] inputSplits = kuduReader.createInputSplits(1);
+ ArrayList<RowData> rows = new ArrayList<>();
+ for (KuduInputSplit inputSplit : inputSplits) {
+ KuduReaderIterator<RowData> scanner = kuduReader.scanner(inputSplit.getScanToken());
+ // 没有启用cache
+ if (cache == null) {
+ while (scanner.hasNext()) {
+ collect(scanner.next());
+ }
+ } else {
+ while (scanner.hasNext()) {
+ RowData row = scanner.next();
+ rows.add(row);
+ collect(row);
+ }
+ rows.trimToSize();
+ }
+ }
+ if (cache != null) {
+ cache.put(keyRow, rows);
+ }
+ break;
+ } catch (Exception e) {
+ LOG.error(String.format("Kudu scan error, retry times = %d", retry), e);
+ if (retry >= maxRetryTimes) {
+ throw new RuntimeException("Execution of Kudu scan failed.", e);
+ }
+ try {
+ Thread.sleep(1000L * retry);
+ } catch (InterruptedException e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+ }
+
+ /**
+ * build kuduFilterInfo
+ *
+ * @return kudu filters
+ */
+ private List<KuduFilterInfo> buildKuduFilterInfo(Object... keyValS) {
+ List<KuduFilterInfo> kuduFilterInfos = Lists.newArrayList();
+ for (int i = 0; i < keyNames.length; i++) {
+ KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create(keyNames[i])
+ .equalTo(keyValS[i]).build();
+ kuduFilterInfos.add(kuduFilterInfo);
+ }
+ return kuduFilterInfos;
+ }
+
+
+ @Override
+ public void open(FunctionContext context) {
+ try {
+ super.open(context);
+ this.kuduReader = new KuduReader<>(this.tableInfo, this.kuduReaderConfig, this.convertor);
+ // build kudu cache
+ this.kuduReader.setTableProjections(ArrayUtils.isNotEmpty(projectedFields) ?
+ Arrays.asList(projectedFields) : null);
+ this.cache = this.cacheMaxSize == -1 || this.cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
+ .expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(this.cacheMaxSize)
+ .build();
+ } catch (Exception ioe) {
+ LOG.error("Exception while creating connection to Kudu.", ioe);
+ throw new RuntimeException("Cannot create connection to Kudu.", ioe);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (null != this.kuduReader) {
+ try {
+ this.kuduReader.close();
+ this.cache.cleanUp();
+ // help gc
+ this.cache = null;
+ this.kuduReader = null;
+ } catch (IOException e) {
+ // ignore exception when close.
+ LOG.warn("exception when close table", e);
+ }
+ }
+ }
+
public static class Builder {
private KuduTableInfo tableInfo;
private KuduReaderConfig kuduReaderConfig;
@@ -76,7 +226,8 @@ public class KuduRowDataLookupFunction extends AbstractKuduLookupFunction<RowDat
}
public KuduRowDataLookupFunction build() {
- return new KuduRowDataLookupFunction(keyNames, tableInfo, kuduReaderConfig, projectedFields, kuduLookupOptions);
+ return new KuduRowDataLookupFunction(keyNames, tableInfo, kuduReaderConfig, projectedFields,
+ kuduLookupOptions);
}
}
}