You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/17 22:07:22 UTC

[bahir-flink] branch master updated: [BAHIR-202] Improve KuduSink throughput using async FlushMode

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dc8e7e  [BAHIR-202] Improve KuduSink throughput using async FlushMode
8dc8e7e is described below

commit 8dc8e7e9d98e102a3e71465aa2c2e81d52732193
Author: SuXingLee <91...@qq.com>
AuthorDate: Fri Mar 22 20:08:33 2019 +0800

    [BAHIR-202] Improve KuduSink throughput using async FlushMode
    
    By default, KuduSink processing message one by one
    without checkpoint. When checkoint is enabled, throughput
    is improved by using FlushMode.AUTO_FLUSH_BACKGROUND,
    and use checkpoint to ensure at-least-once.
    
    Closes #50
---
 .../connectors/kudu/KuduOutputFormat.java          |  5 +-
 .../flink/streaming/connectors/kudu/KuduSink.java  | 59 ++++++++++++++++++++--
 .../connectors/kudu/connector/KuduConnector.java   | 18 +++++--
 3 files changed, 73 insertions(+), 9 deletions(-)

diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
index 9d12710..c1301da 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,8 @@ import java.io.IOException;
 
 public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
 
+    private static final long serialVersionUID = 1L;
+
     private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
 
     private String kuduMasters;
@@ -87,7 +90,7 @@ public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
         if (connector != null) return;
-        connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
+        connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC);
         serializer = serializer.withSchema(tableInfo.getSchema());
     }
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
index 53cf249..b6dd9c8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -17,18 +17,25 @@
 package org.apache.flink.streaming.connectors.kudu;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
 import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
 import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class KuduSink<OUT> extends RichSinkFunction<OUT> {
+public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
 
@@ -36,6 +43,7 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
     private KuduTableInfo tableInfo;
     private KuduConnector.Consistency consistency;
     private KuduConnector.WriteMode writeMode;
+    private FlushMode flushMode;
 
     private KuduSerialization<OUT> serializer;
 
@@ -77,11 +85,42 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
         return this;
     }
 
+    public KuduSink<OUT> withSyncFlushMode() {
+        this.flushMode = FlushMode.AUTO_FLUSH_SYNC;
+        return this;
+    }
+
+    public KuduSink<OUT> withAsyncFlushMode() {
+        this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+        return this;
+    }
+
     @Override
     public void open(Configuration parameters) throws IOException {
-        if (connector != null) return;
-        connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
-        serializer.withSchema(tableInfo.getSchema());
+        if (this.connector != null) return;
+        this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode());
+        this.serializer.withSchema(tableInfo.getSchema());
+    }
+
+    /**
+     * If flink checkpoint is disable,synchronously write data to kudu.
+     * <p>If flink checkpoint is enable, asynchronously write data to kudu by default.
+     *
+     * <p>(Note: async may result in out-of-order writes to Kudu.
+     *  you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. )
+     *
+     * @return flushMode
+     */
+    private FlushMode getflushMode() {
+        FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC;
+        boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+        if(enableCheckpoint && this.flushMode == null) {
+            flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+        }
+        if(enableCheckpoint && this.flushMode != null) {
+            flushMode = this.flushMode;
+        }
+        return flushMode;
     }
 
     @Override
@@ -103,4 +142,16 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
             throw new IOException(e.getLocalizedMessage(), e);
         }
     }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
+        }
+        this.connector.flush();
+    }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
index a3851c4..d45886c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -18,9 +18,11 @@ package org.apache.flink.streaming.connectors.kudu.connector;
 
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.common.time.Time;
 import org.apache.kudu.client.*;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +42,7 @@ public class KuduConnector implements AutoCloseable {
 
     private AsyncKuduClient client;
     private KuduTable table;
+    private AsyncKuduSession session;
 
     private Consistency consistency;
     private WriteMode writeMode;
@@ -48,15 +51,17 @@ public class KuduConnector implements AutoCloseable {
     private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
 
     public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
-        this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+        this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC);
     }
 
-    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode) throws IOException {
+    public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException {
         this.client = client(kuduMasters);
         this.table = table(tableInfo);
+        this.session = client.newSession();
         this.consistency = consistency;
         this.writeMode = writeMode;
         this.defaultCB = new ResponseCallback();
+        this.session.setFlushMode(flushMode);
     }
 
     private AsyncKuduClient client(String kuduMasters) {
@@ -109,7 +114,6 @@ public class KuduConnector implements AutoCloseable {
     public boolean writeRow(KuduRow row) throws Exception {
         final Operation operation = KuduMapper.toOperation(table, writeMode, row);
 
-        AsyncKuduSession session = client.newSession();
         Deferred<OperationResponse> response = session.apply(operation);
 
         if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
@@ -119,7 +123,6 @@ public class KuduConnector implements AutoCloseable {
             processResponse(response.join());
         }
 
-        session.close();
         return !errorTransactions.get();
 
     }
@@ -131,10 +134,17 @@ public class KuduConnector implements AutoCloseable {
             Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
         }
 
+        if (session == null) return;
+        session.close();
+
         if (client == null) return;
         client.close();
     }
 
+    public void flush(){
+        this.session.flush();
+    }
+
     private class ResponseCallback implements Callback<Boolean, OperationResponse> {
         @Override
         public Boolean call(OperationResponse operationResponse) {