You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2021/04/07 11:35:15 UTC
[bahir-flink] branch master updated: [BAHIR-260] Add kudu table
writer config (#109)
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 47d6dc7 [BAHIR-260] Add kudu table writer config (#109)
47d6dc7 is described below
commit 47d6dc7b14bf2e9fd861555d014882220f340e3c
Author: hackergin <ji...@gmail.com>
AuthorDate: Wed Apr 7 06:35:07 2021 -0500
[BAHIR-260] Add kudu table writer config (#109)
---
.../connectors/kudu/connector/KuduTableInfo.java | 18 ++++
.../kudu/connector/writer/KuduWriter.java | 5 +
.../kudu/connector/writer/KuduWriterConfig.java | 113 ++++++++++++++++++++-
.../connectors/kudu/table/KuduTableFactory.java | 49 ++++++++-
.../flink/connectors/kudu/table/KuduTableSink.java | 21 ++++
.../kudu/table/KuduTableFactoryTest.java | 44 +++++++-
6 files changed, 243 insertions(+), 7 deletions(-)
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index 83c7dde..baae8a0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -23,6 +23,7 @@ import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
import java.io.Serializable;
+import java.util.Objects;
/**
* Describes which table should be used in sources and sinks along with specifications
@@ -103,4 +104,21 @@ public class KuduTableInfo implements Serializable {
}
return createTableOptionsFactory.getCreateTableOptions();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KuduTableInfo that = (KuduTableInfo) o;
+ return Objects.equals(this.name, that.name);
+ }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 03c37ea..59ad196 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -72,6 +72,11 @@ public class KuduWriter<T> implements AutoCloseable {
private KuduSession obtainSession() {
KuduSession session = client.newSession();
session.setFlushMode(writerConfig.getFlushMode());
+ session.setTimeoutMillis(writerConfig.getOperationTimeout());
+ session.setMutationBufferSpace(writerConfig.getMaxBufferSize());
+ session.setFlushInterval(writerConfig.getFlushInterval());
+ session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate());
+ session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound());
return session;
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 598f8d0..ff93921 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -19,8 +19,10 @@ package org.apache.flink.connectors.kudu.connector.writer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.kudu.client.AsyncKuduClient;
import java.io.Serializable;
+import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.kudu.client.SessionConfiguration.FlushMode;
@@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable {
private final String masters;
private final FlushMode flushMode;
+ private final long operationTimeout;
+ private int maxBufferSize;
+ private int flushInterval;
+ private boolean ignoreNotFound;
+ private boolean ignoreDuplicate;
private KuduWriterConfig(
String masters,
- FlushMode flushMode) {
+ FlushMode flushMode,
+ long operationTimeout,
+ int maxBufferSize,
+ int flushInterval,
+ boolean ignoreNotFound,
+ boolean ignoreDuplicate) {
this.masters = checkNotNull(masters, "Kudu masters cannot be null");
this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
+ this.operationTimeout = operationTimeout;
+ this.maxBufferSize = maxBufferSize;
+ this.flushInterval = flushInterval;
+ this.ignoreNotFound = ignoreNotFound;
+ this.ignoreDuplicate = ignoreDuplicate;
}
public String getMasters() {
@@ -51,6 +68,26 @@ public class KuduWriterConfig implements Serializable {
return flushMode;
}
+ public long getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public int getMaxBufferSize() {
+ return maxBufferSize;
+ }
+
+ public int getFlushInterval() {
+ return flushInterval;
+ }
+
+ public boolean isIgnoreNotFound() {
+ return ignoreNotFound;
+ }
+
+ public boolean isIgnoreDuplicate() {
+ return ignoreDuplicate;
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this)
@@ -65,6 +102,16 @@ public class KuduWriterConfig implements Serializable {
public static class Builder {
private String masters;
private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+ // Reference from AsyncKuduClientBuilder defaultOperationTimeoutMs.
+ private long timeout = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+ // Reference from AsyncKuduSession mutationBufferMaxOps 1000.
+ private int maxBufferSize = 1000;
+ // Reference from AsyncKuduSession flushIntervalMillis 1000.
+ private int flushInterval = 1000;
+ // Reference from AsyncKuduSession ignoreAllNotFoundRows false.
+ private boolean ignoreNotFound = false;
+ // Reference from AsyncKuduSession ignoreAllDuplicateRows false.
+ private boolean ignoreDuplicate = false;
private Builder(String masters) {
this.masters = masters;
@@ -87,10 +134,72 @@ public class KuduWriterConfig implements Serializable {
return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
}
+ public Builder setMaxBufferSize(int maxBufferSize) {
+ this.maxBufferSize = maxBufferSize;
+ return this;
+ }
+
+ public Builder setFlushInterval(int flushInterval) {
+ this.flushInterval = flushInterval;
+ return this;
+ }
+
+ public Builder setOperationTimeout(long timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder setIgnoreNotFound(boolean ignoreNotFound) {
+ this.ignoreNotFound = ignoreNotFound;
+ return this;
+ }
+
+ public Builder setIgnoreDuplicate(boolean ignoreDuplicate) {
+ this.ignoreDuplicate = ignoreDuplicate;
+ return this;
+ }
+
public KuduWriterConfig build() {
return new KuduWriterConfig(
masters,
- flushMode);
+ flushMode,
+ timeout,
+ maxBufferSize,
+ flushInterval,
+ ignoreNotFound,
+ ignoreDuplicate);
+ }
+
+ @Override
+ public int hashCode() {
+ int result =
+ Objects.hash(
+ masters,
+ flushMode,
+ timeout,
+ maxBufferSize,
+ flushInterval,
+ ignoreNotFound,
+ ignoreDuplicate);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Builder that = (Builder) o;
+ return Objects.equals(masters, that.masters)
+ && Objects.equals(flushMode, that.flushMode)
+ && Objects.equals(timeout, that.timeout)
+ && Objects.equals(maxBufferSize, that.maxBufferSize)
+ && Objects.equals(flushInterval, that.flushInterval)
+ && Objects.equals(ignoreNotFound, that.ignoreNotFound)
+ && Objects.equals(ignoreDuplicate, that.ignoreDuplicate);
}
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 1961aad..524f521 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -35,11 +35,28 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.*;
-import static org.apache.flink.table.descriptors.Rowtime.*;
-import static org.apache.flink.table.descriptors.Schema.*;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
@@ -49,6 +66,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
public static final String KUDU_HASH_COLS = "kudu.hash-columns";
public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
public static final String KUDU_REPLICAS = "kudu.replicas";
+ public static final String KUDU_MAX_BUFFER_SIZE = "kudu.max-buffer-size";
+ public static final String KUDU_FLUSH_INTERVAL = "kudu.flush-interval";
+ public static final String KUDU_OPERATION_TIMEOUT = "kudu.operation-timeout";
+ public static final String KUDU_IGNORE_NOT_FOUND = "kudu.ignore-not-found";
+ public static final String KUDU_IGNORE_DUPLICATE = "kudu.ignore-duplicate";
public static final String KUDU = "kudu";
@Override
@@ -65,6 +87,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
properties.add(KUDU_MASTERS);
properties.add(KUDU_HASH_COLS);
properties.add(KUDU_PRIMARY_KEY_COLS);
+ properties.add(KUDU_MAX_BUFFER_SIZE);
+ properties.add(KUDU_FLUSH_INTERVAL);
+ properties.add(KUDU_OPERATION_TIMEOUT);
+ properties.add(KUDU_IGNORE_NOT_FOUND);
+ properties.add(KUDU_IGNORE_DUPLICATE);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
@@ -123,10 +150,12 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
validateTable(table);
String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
- return createTableSink(tableName, table.getSchema(), table.getProperties());
+ return createTableSink(tableName, table.getSchema(), table.toProperties());
}
private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
+ DescriptorProperties properties = new DescriptorProperties();
+ properties.putProperties(props);
String masterAddresses = props.get(KUDU_MASTERS);
TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
@@ -134,6 +163,18 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
.setMasters(masterAddresses);
+ Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT);
+ Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL);
+ Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE);
+ Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND);
+ Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE);
+
+ operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time));
+ flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval));
+ bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size));
+ ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i));
+ ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i));
+
return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
}
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
index 99325c0..5ada84e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;
+import java.util.Objects;
+
public class KuduTableSink implements UpsertStreamTableSink<Row> {
private final KuduWriterConfig.Builder writerConfigBuilder;
@@ -68,4 +70,23 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> {
@Override
public TableSchema getTableSchema() { return flinkSchema; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != this.getClass()) {
+ return false;
+ }
+ KuduTableSink that = (KuduTableSink) o;
+ return this.writerConfigBuilder.equals(that.writerConfigBuilder) &&
+ this.flinkSchema.equals(that.flinkSchema) &&
+ this.tableInfo.equals(that.tableInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(writerConfigBuilder, flinkSchema, tableInfo);
+ }
}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index d4de7f6..a8ec686 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -16,12 +16,21 @@
*/
package org.apache.flink.connectors.kudu.table;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.sinks.TableSink;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
@@ -32,12 +41,17 @@ import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class KuduTableFactoryTest extends KuduTestBase {
private StreamTableEnvironment tableEnv;
@@ -153,4 +167,32 @@ public class KuduTableFactoryTest extends KuduTestBase {
assertEquals("f2", rows.get(1).getString("first"));
assertEquals("s2", rows.get(1).getString("second"));
}
+
+ @Test
+ public void testTableSink() {
+ final TableSchema schema = TableSchema.builder()
+ .field("first", DataTypes.STRING())
+ .field("second", DataTypes.STRING())
+ .build();
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("connector.type", "kudu");
+ properties.put("kudu.masters", kuduMasters);
+ properties.put("kudu.table", "TestTable12");
+ properties.put("kudu.ignore-not-found", "true");
+ properties.put("kudu.ignore-duplicate", "true");
+ properties.put("kudu.flush-interval", "10000");
+ properties.put("kudu.max-buffer-size", "10000");
+
+ KuduWriterConfig.Builder builder = KuduWriterConfig.Builder.setMasters(kuduMasters)
+ .setFlushInterval(10000)
+ .setMaxBufferSize(10000)
+ .setIgnoreDuplicate(true)
+ .setIgnoreNotFound(true);
+ KuduTableInfo kuduTableInfo = KuduTableInfo.forTable("TestTable12");
+ KuduTableSink expected = new KuduTableSink(builder, kuduTableInfo, schema);
+ final TableSink<?> actualSink = TableFactoryService.find(TableSinkFactory.class, properties)
+ .createTableSink(ObjectPath.fromString("default.TestTable12"), new CatalogTableImpl(schema, properties, null));
+
+ assertEquals(expected, actualSink);
+ }
}