You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/05/17 22:07:22 UTC
[bahir-flink] branch master updated: [BAHIR-202] Improve KuduSink
throughput using async FlushMode
This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8dc8e7e [BAHIR-202] Improve KuduSink throughput using async FlushMode
8dc8e7e is described below
commit 8dc8e7e9d98e102a3e71465aa2c2e81d52732193
Author: SuXingLee <91...@qq.com>
AuthorDate: Fri Mar 22 20:08:33 2019 +0800
[BAHIR-202] Improve KuduSink throughput using async FlushMode
By default, KuduSink processing message one by one
without checkpoint. When checkoint is enabled, throughput
is improved by using FlushMode.AUTO_FLUSH_BACKGROUND,
and use checkpoint to ensure at-least-once.
Closes #50
---
.../connectors/kudu/KuduOutputFormat.java | 5 +-
.../flink/streaming/connectors/kudu/KuduSink.java | 59 ++++++++++++++++++++--
.../connectors/kudu/connector/KuduConnector.java | 18 +++++--
3 files changed, 73 insertions(+), 9 deletions(-)
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
index 9d12710..c1301da 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,8 @@ import java.io.IOException;
public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
private String kuduMasters;
@@ -87,7 +90,7 @@ public class KuduOutputFormat<OUT> extends RichOutputFormat<OUT> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (connector != null) return;
- connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
+ connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode,FlushMode.AUTO_FLUSH_SYNC);
serializer = serializer.withSchema(tableInfo.getSchema());
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
index 53cf249..b6dd9c8 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
@@ -17,18 +17,25 @@
package org.apache.flink.streaming.connectors.kudu;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector;
import org.apache.flink.streaming.connectors.kudu.connector.KuduRow;
import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.streaming.connectors.kudu.serde.KuduSerialization;
import org.apache.flink.util.Preconditions;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class KuduSink<OUT> extends RichSinkFunction<OUT> {
+public class KuduSink<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class);
@@ -36,6 +43,7 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
private KuduTableInfo tableInfo;
private KuduConnector.Consistency consistency;
private KuduConnector.WriteMode writeMode;
+ private FlushMode flushMode;
private KuduSerialization<OUT> serializer;
@@ -77,11 +85,42 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
return this;
}
+ public KuduSink<OUT> withSyncFlushMode() {
+ this.flushMode = FlushMode.AUTO_FLUSH_SYNC;
+ return this;
+ }
+
+ public KuduSink<OUT> withAsyncFlushMode() {
+ this.flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+ return this;
+ }
+
@Override
public void open(Configuration parameters) throws IOException {
- if (connector != null) return;
- connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode);
- serializer.withSchema(tableInfo.getSchema());
+ if (this.connector != null) return;
+ this.connector = new KuduConnector(kuduMasters, tableInfo, consistency, writeMode, getflushMode());
+ this.serializer.withSchema(tableInfo.getSchema());
+ }
+
+ /**
+ * If flink checkpoint is disable,synchronously write data to kudu.
+ * <p>If flink checkpoint is enable, asynchronously write data to kudu by default.
+ *
+ * <p>(Note: async may result in out-of-order writes to Kudu.
+ * you also can change to sync by explicitly calling {@link KuduSink#withSyncFlushMode()} when initializing KuduSink. )
+ *
+ * @return flushMode
+ */
+ private FlushMode getflushMode() {
+ FlushMode flushMode = FlushMode.AUTO_FLUSH_SYNC;
+ boolean enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
+ if(enableCheckpoint && this.flushMode == null) {
+ flushMode = FlushMode.AUTO_FLUSH_BACKGROUND;
+ }
+ if(enableCheckpoint && this.flushMode != null) {
+ flushMode = this.flushMode;
+ }
+ return flushMode;
}
@Override
@@ -103,4 +142,16 @@ public class KuduSink<OUT> extends RichSinkFunction<OUT> {
throw new IOException(e.getLocalizedMessage(), e);
}
}
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
+ }
+ this.connector.flush();
+ }
}
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
index a3851c4..d45886c 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java
@@ -18,9 +18,11 @@ package org.apache.flink.streaming.connectors.kudu.connector;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.time.Time;
import org.apache.kudu.client.*;
+import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,7 @@ public class KuduConnector implements AutoCloseable {
private AsyncKuduClient client;
private KuduTable table;
+ private AsyncKuduSession session;
private Consistency consistency;
private WriteMode writeMode;
@@ -48,15 +51,17 @@ public class KuduConnector implements AutoCloseable {
private static AtomicBoolean errorTransactions = new AtomicBoolean(false);
public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException {
- this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+ this(kuduMasters, tableInfo, KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT,FlushMode.AUTO_FLUSH_SYNC);
}
- public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode) throws IOException {
+ public KuduConnector(String kuduMasters, KuduTableInfo tableInfo, Consistency consistency, WriteMode writeMode,FlushMode flushMode) throws IOException {
this.client = client(kuduMasters);
this.table = table(tableInfo);
+ this.session = client.newSession();
this.consistency = consistency;
this.writeMode = writeMode;
this.defaultCB = new ResponseCallback();
+ this.session.setFlushMode(flushMode);
}
private AsyncKuduClient client(String kuduMasters) {
@@ -109,7 +114,6 @@ public class KuduConnector implements AutoCloseable {
public boolean writeRow(KuduRow row) throws Exception {
final Operation operation = KuduMapper.toOperation(table, writeMode, row);
- AsyncKuduSession session = client.newSession();
Deferred<OperationResponse> response = session.apply(operation);
if (KuduConnector.Consistency.EVENTUAL.equals(consistency)) {
@@ -119,7 +123,6 @@ public class KuduConnector implements AutoCloseable {
processResponse(response.join());
}
- session.close();
return !errorTransactions.get();
}
@@ -131,10 +134,17 @@ public class KuduConnector implements AutoCloseable {
Thread.sleep(Time.seconds(pendingTransactions.get()).toMilliseconds());
}
+ if (session == null) return;
+ session.close();
+
if (client == null) return;
client.close();
}
+ public void flush(){
+ this.session.flush();
+ }
+
private class ResponseCallback implements Callback<Boolean, OperationResponse> {
@Override
public Boolean call(OperationResponse operationResponse) {