You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2021/04/07 11:35:15 UTC

[bahir-flink] branch master updated: [BAHIR-260] Add kudu table writer config (#109)

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d6dc7  [BAHIR-260] Add kudu table writer config  (#109)
47d6dc7 is described below

commit 47d6dc7b14bf2e9fd861555d014882220f340e3c
Author: hackergin <ji...@gmail.com>
AuthorDate: Wed Apr 7 06:35:07 2021 -0500

    [BAHIR-260] Add kudu table writer config  (#109)
---
 .../connectors/kudu/connector/KuduTableInfo.java   |  18 ++++
 .../kudu/connector/writer/KuduWriter.java          |   5 +
 .../kudu/connector/writer/KuduWriterConfig.java    | 113 ++++++++++++++++++++-
 .../connectors/kudu/table/KuduTableFactory.java    |  49 ++++++++-
 .../flink/connectors/kudu/table/KuduTableSink.java |  21 ++++
 .../kudu/table/KuduTableFactoryTest.java           |  44 +++++++-
 6 files changed, 243 insertions(+), 7 deletions(-)

diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index 83c7dde..baae8a0 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -23,6 +23,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Describes which table should be used in sources and sinks along with specifications
@@ -103,4 +104,21 @@ public class KuduTableInfo implements Serializable {
         }
         return createTableOptionsFactory.getCreateTableOptions();
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KuduTableInfo that = (KuduTableInfo) o;
+        return Objects.equals(this.name, that.name);
+    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 03c37ea..59ad196 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -72,6 +72,11 @@ public class KuduWriter<T> implements AutoCloseable {
     private KuduSession obtainSession() {
         KuduSession session = client.newSession();
         session.setFlushMode(writerConfig.getFlushMode());
+        session.setTimeoutMillis(writerConfig.getOperationTimeout());
+        session.setMutationBufferSpace(writerConfig.getMaxBufferSize());
+        session.setFlushInterval(writerConfig.getFlushInterval());
+        session.setIgnoreAllDuplicateRows(writerConfig.isIgnoreDuplicate());
+        session.setIgnoreAllNotFoundRows(writerConfig.isIgnoreNotFound());
         return session;
     }
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 598f8d0..ff93921 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -19,8 +19,10 @@ package org.apache.flink.connectors.kudu.connector.writer;
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.kudu.client.AsyncKuduClient;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.kudu.client.SessionConfiguration.FlushMode;
@@ -34,13 +36,28 @@ public class KuduWriterConfig implements Serializable {
 
     private final String masters;
     private final FlushMode flushMode;
+    private final long operationTimeout;
+    private int maxBufferSize;
+    private int flushInterval;
+    private boolean ignoreNotFound;
+    private boolean ignoreDuplicate;
 
     private KuduWriterConfig(
             String masters,
-            FlushMode flushMode) {
+            FlushMode flushMode,
+            long operationTimeout,
+            int maxBufferSize,
+            int flushInterval,
+            boolean ignoreNotFound,
+            boolean ignoreDuplicate) {
 
         this.masters = checkNotNull(masters, "Kudu masters cannot be null");
         this.flushMode = checkNotNull(flushMode, "Kudu flush mode cannot be null");
+        this.operationTimeout = operationTimeout;
+        this.maxBufferSize = maxBufferSize;
+        this.flushInterval = flushInterval;
+        this.ignoreNotFound = ignoreNotFound;
+        this.ignoreDuplicate = ignoreDuplicate;
     }
 
     public String getMasters() {
@@ -51,6 +68,26 @@ public class KuduWriterConfig implements Serializable {
         return flushMode;
     }
 
+    public long getOperationTimeout() {
+        return operationTimeout;
+    }
+
+    public int getMaxBufferSize() {
+        return maxBufferSize;
+    }
+
+    public int getFlushInterval() {
+        return flushInterval;
+    }
+
+    public boolean isIgnoreNotFound() {
+        return ignoreNotFound;
+    }
+
+    public boolean isIgnoreDuplicate() {
+        return ignoreDuplicate;
+    }
+
     @Override
     public String toString() {
         return new ToStringBuilder(this)
@@ -65,6 +102,16 @@ public class KuduWriterConfig implements Serializable {
     public static class Builder {
         private String masters;
         private FlushMode flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+        // Reference from AsyncKuduClientBuilder defaultOperationTimeoutMs.
+        private long timeout = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+        // Reference from AsyncKuduSession mutationBufferMaxOps 1000.
+        private int maxBufferSize = 1000;
+        // Reference from AsyncKuduSession flushIntervalMillis 1000.
+        private int flushInterval = 1000;
+        // Reference from AsyncKuduSession ignoreAllNotFoundRows false.
+        private boolean ignoreNotFound = false;
+        // Reference from AsyncKuduSession ignoreAllDuplicateRows false.
+        private boolean ignoreDuplicate = false;
 
         private Builder(String masters) {
             this.masters = masters;
@@ -87,10 +134,72 @@ public class KuduWriterConfig implements Serializable {
             return setConsistency(FlushMode.AUTO_FLUSH_SYNC);
         }
 
+        public Builder setMaxBufferSize(int maxBufferSize) {
+            this.maxBufferSize = maxBufferSize;
+            return this;
+        }
+
+        public Builder setFlushInterval(int flushInterval) {
+            this.flushInterval = flushInterval;
+            return this;
+        }
+
+        public Builder setOperationTimeout(long timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public Builder setIgnoreNotFound(boolean ignoreNotFound) {
+            this.ignoreNotFound = ignoreNotFound;
+            return this;
+        }
+
+        public Builder setIgnoreDuplicate(boolean ignoreDuplicate) {
+            this.ignoreDuplicate = ignoreDuplicate;
+            return this;
+        }
+
         public KuduWriterConfig build() {
             return new KuduWriterConfig(
                     masters,
-                    flushMode);
+                    flushMode,
+                    timeout,
+                    maxBufferSize,
+                    flushInterval,
+                    ignoreNotFound,
+                    ignoreDuplicate);
+        }
+
+        @Override
+        public int hashCode() {
+            int result =
+                    Objects.hash(
+                            masters,
+                            flushMode,
+                            timeout,
+                            maxBufferSize,
+                            flushInterval,
+                            ignoreNotFound,
+                            ignoreDuplicate);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Builder that = (Builder) o;
+            return Objects.equals(masters, that.masters)
+                    && Objects.equals(flushMode, that.flushMode)
+                    && Objects.equals(timeout, that.timeout)
+                    && Objects.equals(maxBufferSize, that.maxBufferSize)
+                    && Objects.equals(flushInterval, that.flushInterval)
+                    && Objects.equals(ignoreNotFound, that.ignoreNotFound)
+                    && Objects.equals(ignoreDuplicate, that.ignoreDuplicate);
         }
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 1961aad..524f521 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -35,11 +35,28 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.*;
-import static org.apache.flink.table.descriptors.Rowtime.*;
-import static org.apache.flink.table.descriptors.Schema.*;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> {
@@ -49,6 +66,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
     public static final String KUDU_HASH_COLS = "kudu.hash-columns";
     public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
     public static final String KUDU_REPLICAS = "kudu.replicas";
+    public static final String KUDU_MAX_BUFFER_SIZE = "kudu.max-buffer-size";
+    public static final String KUDU_FLUSH_INTERVAL = "kudu.flush-interval";
+    public static final String KUDU_OPERATION_TIMEOUT = "kudu.operation-timeout";
+    public static final String KUDU_IGNORE_NOT_FOUND = "kudu.ignore-not-found";
+    public static final String KUDU_IGNORE_DUPLICATE = "kudu.ignore-duplicate";
     public static final String KUDU = "kudu";
 
     @Override
@@ -65,6 +87,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
         properties.add(KUDU_MASTERS);
         properties.add(KUDU_HASH_COLS);
         properties.add(KUDU_PRIMARY_KEY_COLS);
+        properties.add(KUDU_MAX_BUFFER_SIZE);
+        properties.add(KUDU_FLUSH_INTERVAL);
+        properties.add(KUDU_OPERATION_TIMEOUT);
+        properties.add(KUDU_IGNORE_NOT_FOUND);
+        properties.add(KUDU_IGNORE_DUPLICATE);
         // schema
         properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
         properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
@@ -123,10 +150,12 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
     public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) {
         validateTable(table);
         String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName());
-        return createTableSink(tableName, table.getSchema(), table.getProperties());
+        return createTableSink(tableName, table.getSchema(), table.toProperties());
     }
 
     private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) {
+        DescriptorProperties properties = new DescriptorProperties();
+        properties.putProperties(props);
         String masterAddresses = props.get(KUDU_MASTERS);
         TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
         KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, schema, props);
@@ -134,6 +163,18 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
         KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder
                 .setMasters(masterAddresses);
 
+        Optional<Long> operationTimeout = properties.getOptionalLong(KUDU_OPERATION_TIMEOUT);
+        Optional<Integer> flushInterval = properties.getOptionalInt(KUDU_FLUSH_INTERVAL);
+        Optional<Integer> bufferSize = properties.getOptionalInt(KUDU_MAX_BUFFER_SIZE);
+        Optional<Boolean> ignoreNotFound = properties.getOptionalBoolean(KUDU_IGNORE_NOT_FOUND);
+        Optional<Boolean> ignoreDuplicate = properties.getOptionalBoolean(KUDU_IGNORE_DUPLICATE);
+
+        operationTimeout.ifPresent(time -> configBuilder.setOperationTimeout(time));
+        flushInterval.ifPresent(interval -> configBuilder.setFlushInterval(interval));
+        bufferSize.ifPresent(size -> configBuilder.setMaxBufferSize(size));
+        ignoreNotFound.ifPresent(i -> configBuilder.setIgnoreNotFound(i));
+        ignoreDuplicate.ifPresent(i -> configBuilder.setIgnoreDuplicate(i));
+
         return new KuduTableSink(configBuilder, tableInfo, physicalSchema);
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
index 99325c0..5ada84e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.sinks.UpsertStreamTableSink;
 import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
 
+import java.util.Objects;
+
 public class KuduTableSink implements UpsertStreamTableSink<Row> {
 
     private final KuduWriterConfig.Builder writerConfigBuilder;
@@ -68,4 +70,23 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> {
 
     @Override
     public TableSchema getTableSchema() { return flinkSchema; }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != this.getClass()) {
+            return false;
+        }
+        KuduTableSink that = (KuduTableSink) o;
+        return this.writerConfigBuilder.equals(that.writerConfigBuilder) &&
+                this.flinkSchema.equals(that.flinkSchema) &&
+                this.tableInfo.equals(that.tableInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(writerConfigBuilder, flinkSchema, tableInfo);
+    }
 }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index d4de7f6..a8ec686 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -16,12 +16,21 @@
  */
 package org.apache.flink.connectors.kudu.table;
 
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduTable;
@@ -32,12 +41,17 @@ import org.junit.jupiter.api.Test;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class KuduTableFactoryTest extends KuduTestBase {
     private StreamTableEnvironment tableEnv;
@@ -153,4 +167,32 @@ public class KuduTableFactoryTest extends KuduTestBase {
         assertEquals("f2", rows.get(1).getString("first"));
         assertEquals("s2", rows.get(1).getString("second"));
     }
+
+    @Test
+    public void testTableSink() {
+        final TableSchema schema = TableSchema.builder()
+                .field("first", DataTypes.STRING())
+                .field("second", DataTypes.STRING())
+                .build();
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("connector.type", "kudu");
+        properties.put("kudu.masters", kuduMasters);
+        properties.put("kudu.table", "TestTable12");
+        properties.put("kudu.ignore-not-found", "true");
+        properties.put("kudu.ignore-duplicate", "true");
+        properties.put("kudu.flush-interval", "10000");
+        properties.put("kudu.max-buffer-size", "10000");
+
+        KuduWriterConfig.Builder builder = KuduWriterConfig.Builder.setMasters(kuduMasters)
+                .setFlushInterval(10000)
+                .setMaxBufferSize(10000)
+                .setIgnoreDuplicate(true)
+                .setIgnoreNotFound(true);
+        KuduTableInfo kuduTableInfo = KuduTableInfo.forTable("TestTable12");
+        KuduTableSink expected = new KuduTableSink(builder, kuduTableInfo, schema);
+        final TableSink<?> actualSink = TableFactoryService.find(TableSinkFactory.class, properties)
+                .createTableSink(ObjectPath.fromString("default.TestTable12"), new CatalogTableImpl(schema, properties, null));
+
+        assertEquals(expected, actualSink);
+    }
 }