You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 09:04:13 UTC

[flink] 03/04: [FLINK-16496][hbase][table] Improve default flush strategy for new HBase sink for better out-of-box

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37ba881049b795ef3c3af668477885b6f03b4243
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 8 20:43:43 2020 +0800

    [FLINK-16496][hbase][table] Improve default flush strategy for new HBase sink for better out-of-box
    
    The default flush strategy for old HBase sink is no flush interval and 2MB buffered size.
    The new default flush strategy for new HBase sink is '1s' flush interval and '1000' buffered rows and '2mb' buffered size.
    
    This closes #12536
---
 docs/dev/table/connectors/hbase.md                 | 13 +++---
 docs/dev/table/connectors/hbase.zh.md              | 13 +++---
 .../connector/hbase/HBaseDynamicTableFactory.java  | 39 +++++++++--------
 .../connector/hbase/options/HBaseWriteOptions.java | 16 +------
 .../connector/hbase/sink/HBaseSinkFunction.java    |  6 ++-
 .../hbase/HBaseDynamicTableFactoryTest.java        | 49 +++++++++++++++++++---
 6 files changed, 85 insertions(+), 51 deletions(-)

diff --git a/docs/dev/table/connectors/hbase.md b/docs/dev/table/connectors/hbase.md
index 503e11e..60f6eb8 100644
--- a/docs/dev/table/connectors/hbase.md
+++ b/docs/dev/table/connectors/hbase.md
@@ -110,7 +110,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
-      <td>The root dir in Zookeeper for HBase cluster</td>
+      <td>The root dir in Zookeeper for HBase cluster.</td>
     </tr>
     <tr>
       <td><h5>null-string-literal</h5></td>
@@ -126,6 +126,7 @@ Connector Options
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
@@ -135,16 +136,18 @@ Connector Options
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
-      No default value, which means the default flushing is not depends on the number of buffered rows
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
-      <td>Writing option, the interval to flush buffered rows.
-      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      <td>Writing option, the interval to flush any buffered rows.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows'
+      can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.
       </td>
     </tr>
     </tbody>
diff --git a/docs/dev/table/connectors/hbase.zh.md b/docs/dev/table/connectors/hbase.zh.md
index 503e11e..60f6eb8 100644
--- a/docs/dev/table/connectors/hbase.zh.md
+++ b/docs/dev/table/connectors/hbase.zh.md
@@ -110,7 +110,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
-      <td>The root dir in Zookeeper for HBase cluster</td>
+      <td>The root dir in Zookeeper for HBase cluster.</td>
     </tr>
     <tr>
       <td><h5>null-string-literal</h5></td>
@@ -126,6 +126,7 @@ Connector Options
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
@@ -135,16 +136,18 @@ Connector Options
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing request.
       This can improve performance for writing data to HBase database, but may increase the latency.
-      No default value, which means the default flushing is not depends on the number of buffered rows
+      Can be set to '0' to disable it.
       </td>
     </tr>
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
-      <td>Writing option, the interval to flush buffered rows.
-      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      <td>Writing option, the interval to flush any buffered rows.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows'
+      can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.
       </td>
     </tr>
     </tbody>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index 48b70eb..6e58aba 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -54,50 +54,51 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 		.key("table-name")
 		.stringType()
 		.noDefaultValue()
-		.withDescription("Required. It defines the HBase table name.");
+		.withDescription("The name of HBase table to connect.");
 
 	private static final ConfigOption<String> ZOOKEEPER_QUORUM = ConfigOptions
 		.key("zookeeper.quorum")
 		.stringType()
 		.noDefaultValue()
-		.withDescription("Required. It defines HBase Zookeeper quorum.");
+		.withDescription("The HBase Zookeeper quorum.");
 
 	private static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT = ConfigOptions
 		.key("zookeeper.znode.parent")
 		.stringType()
 		.defaultValue("/hbase")
-		.withDescription("Optional. The root dir in Zookeeper for HBase cluster, default value is '/hbase'");
+		.withDescription("The root dir in Zookeeper for HBase cluster.");
 
 	private static final ConfigOption<String> NULL_STRING_LITERAL = ConfigOptions
 		.key("null-string-literal")
 		.stringType()
 		.defaultValue("null")
-		.withDescription("Optional. Representation for null values for string fields. (\"null\" by default). " +
-			"HBase connector encode/decode empty bytes as null values except string types.");
+		.withDescription("Representation for null values for string fields. HBase source and " +
+			"sink encodes/decodes empty bytes as null values for all types except string type.");
 
 	private static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_SIZE = ConfigOptions
 		.key("sink.buffer-flush.max-size")
 		.memoryType()
 		.defaultValue(MemorySize.parse("2mb"))
-		.withDescription("Optional. Writing option, determines how many size in memory of " +
-			"buffered rows to insert per round trip. This can help performance on writing " +
-			"to JDBC database. The default value is '2mb'.");
+		.withDescription("Writing option, maximum size in memory of buffered rows for each " +
+			"writing request. This can improve performance for writing data to HBase database, " +
+			"but may increase the latency. Can be set to '0' to disable it. ");
 
 	private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
 		.key("sink.buffer-flush.max-rows")
 		.intType()
-		.noDefaultValue()
-		.withDescription("Optional. Writing option, determines how many rows to insert " +
-			"per round trip. This can help performance on writing to JDBC database. " +
-			"No default value, i.e. the default flushing is not depends on the number of buffered rows.");
+		.defaultValue(1000)
+		.withDescription("Writing option, maximum number of rows to buffer for each writing request. " +
+			"This can improve performance for writing data to HBase database, but may increase the latency. " +
+			"Can be set to '0' to disable it.");
 
 	private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
 		.key("sink.buffer-flush.interval")
 		.durationType()
-		.noDefaultValue()
-		.withDescription("Optional. Writing option, sets a flush interval flushing " +
-			"buffered requesting if the interval passes, in milliseconds. Default value is '0s', " +
-			"which means no asynchronous flush thread will be scheduled.");
+		.defaultValue(Duration.ofSeconds(1))
+		.withDescription("Writing option, the interval to flush any buffered rows. " +
+			"This can improve performance for writing data to HBase database, but may increase the latency. " +
+			"Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' " +
+			"can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
 
 	@Override
 	public DynamicTableSource createDynamicTableSource(Context context) {
@@ -135,10 +136,8 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 
 		HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
 		writeBuilder.setBufferFlushMaxSizeInBytes(helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
-		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_INTERVAL)
-			.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
-		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
-			.ifPresent(writeBuilder::setBufferFlushMaxRows);
+		writeBuilder.setBufferFlushIntervalMillis(helper.getOptions().get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+		writeBuilder.setBufferFlushMaxRows(helper.getOptions().get(SINK_BUFFER_FLUSH_MAX_ROWS));
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
index f4143d4..3f7c0a9 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.hbase.client.ConnectionConfiguration;
 import java.io.Serializable;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Options for HBase writing.
  */
@@ -100,19 +98,15 @@ public class HBaseWriteOptions implements Serializable {
 	 */
 	public static class Builder {
 
-		// default is 2mb which is defined in hbase
 		private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
-		private long bufferFlushMaxRows = -1;
-		private long bufferFlushIntervalMillis = -1;
+		private long bufferFlushMaxRows = 0;
+		private long bufferFlushIntervalMillis = 0;
 
 		/**
 		 * Optional. Sets when to flush a buffered request based on the memory size of rows currently added.
 		 * Default to <code>2mb</code>.
 		 */
 		public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) {
-			checkArgument(
-				bufferFlushMaxSizeInBytes > 0,
-				"Max byte size of buffered rows must be larger than 0.");
 			this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
 			return this;
 		}
@@ -122,9 +116,6 @@ public class HBaseWriteOptions implements Serializable {
 		 * Defaults to not set, i.e. won't flush based on the number of buffered rows.
 		 */
 		public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) {
-			checkArgument(
-				bufferFlushMaxRows > 0,
-				"Max number of buffered rows must be larger than 0.");
 			this.bufferFlushMaxRows = bufferFlushMaxRows;
 			return this;
 		}
@@ -134,9 +125,6 @@ public class HBaseWriteOptions implements Serializable {
 		 * Defaults to not set, i.e. won't flush based on flush interval.
 		 */
 		public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) {
-			checkArgument(
-				bufferFlushIntervalMillis > 0,
-				"Interval (in milliseconds) between each flush must be larger than 0.");
 			this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
 			return this;
 		}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index e9d16bc..38ecc59 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -117,8 +117,10 @@ public class HBaseSinkFunction<T>
 			}
 			// create a parameter instance, set the table name and custom listener reference.
 			BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
-				.listener(this)
-				.writeBufferSize(bufferFlushMaxSizeInBytes);
+				.listener(this);
+			if (bufferFlushMaxSizeInBytes > 0) {
+				params.writeBufferSize(bufferFlushMaxSizeInBytes);
+			}
 			this.mutator = connection.getBufferedMutator(params);
 
 			if (bufferFlushIntervalMillis > 0) {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 32e104a..d51f11b 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -182,14 +182,56 @@ public class HBaseDynamicTableFactoryTest {
 
 		HBaseWriteOptions expectedWriteOptions = HBaseWriteOptions.builder()
 			.setBufferFlushMaxRows(1000)
-			.setBufferFlushIntervalMillis(10 * 1000)
-			.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+			.setBufferFlushIntervalMillis(1000)
+			.setBufferFlushMaxSizeInBytes(2 * 1024 * 1024)
 			.build();
 		HBaseWriteOptions actualWriteOptions = hbaseSink.getWriteOptions();
 		assertEquals(expectedWriteOptions, actualWriteOptions);
 	}
 
 	@Test
+	public void testBufferFlushOptions() {
+		Map<String, String> options = getAllOptions();
+		options.put("sink.buffer-flush.max-size", "10mb");
+		options.put("sink.buffer-flush.max-rows", "100");
+		options.put("sink.buffer-flush.interval", "10s");
+
+		TableSchema schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.build();
+
+		DynamicTableSink sink = createTableSink(schema, options);
+		HBaseWriteOptions expected = HBaseWriteOptions.builder()
+			.setBufferFlushMaxRows(100)
+			.setBufferFlushIntervalMillis(10 * 1000)
+			.setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+			.build();
+		HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testDisabledBufferFlushOptions() {
+		Map<String, String> options = getAllOptions();
+		options.put("sink.buffer-flush.max-size", "0");
+		options.put("sink.buffer-flush.max-rows", "0");
+		options.put("sink.buffer-flush.interval", "0");
+
+		TableSchema schema = TableSchema.builder()
+			.field(ROWKEY, STRING())
+			.build();
+
+		DynamicTableSink sink = createTableSink(schema, options);
+		HBaseWriteOptions expected = HBaseWriteOptions.builder()
+			.setBufferFlushMaxRows(0)
+			.setBufferFlushIntervalMillis(0)
+			.setBufferFlushMaxSizeInBytes(0)
+			.build();
+		HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions();
+		assertEquals(expected, actual);
+	}
+
+	@Test
 	public void testUnknownOption() {
 		Map<String, String> options = getAllOptions();
 		options.put("sink.unknown.key", "unknown-value");
@@ -225,9 +267,6 @@ public class HBaseDynamicTableFactoryTest {
 		options.put("table-name", "testHBastTable");
 		options.put("zookeeper.quorum", "localhost:2181");
 		options.put("zookeeper.znode.parent", "/flink");
-		options.put("sink.buffer-flush.max-size", "10mb");
-		options.put("sink.buffer-flush.max-rows", "1000");
-		options.put("sink.buffer-flush.interval", "10s");
 		return options;
 	}