You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/21 11:08:36 UTC
[pulsar] branch master updated: [pulsar-io] hbase sink use
BooleanSchema encode Boolean data (#4258)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b868bcd [pulsar-io] hbase sink use BooleanSchema encode Boolean data (#4258)
b868bcd is described below
commit b868bcd55642099cd64cacd17af01902ff212ce0
Author: wpl <12...@qq.com>
AuthorDate: Tue May 21 19:08:29 2019 +0800
[pulsar-io] hbase sink use BooleanSchema encode Boolean data (#4258)
### Motivation
1. use pulsar client BooleanSchema encode Boolean data
2. remove unused code.
### Modifications
1. Use BooleanSchema.encode() replace Bytes.toBytes()
2. remove unused code, eg: required = false,
---
.../java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java | 12 ++++--------
.../apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java | 4 ++--
.../org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java | 5 +----
3 files changed, 7 insertions(+), 14 deletions(-)
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
index a9f362d..39b72ea 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.hbase;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -27,8 +28,6 @@ import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;
-import java.io.Serializable;
-
/**
* Configuration object for all Hbase Sink components.
*/
@@ -40,13 +39,12 @@ import java.io.Serializable;
@Accessors(chain = true)
public class HbaseAbstractConfig implements Serializable {
- private static final long serialVersionUID = 6783394446906640112L;
+ private static final long serialVersionUID = -8945930873383593712L;
@FieldDoc(
- required = false,
- defaultValue = "",
+ defaultValue = "hbase-site.xml",
help = "hbase system configuration 'hbase-site.xml' file")
- private String hbaseConfigResources;
+ private String hbaseConfigResources = "hbase-site.xml";
@FieldDoc(
required = true,
@@ -55,13 +53,11 @@ public class HbaseAbstractConfig implements Serializable {
private String zookeeperQuorum;
@FieldDoc(
- required = false,
defaultValue = "2181",
help = "hbase system configuration about hbase.zookeeper.property.clientPort value")
private String zookeeperClientPort = "2181";
@FieldDoc(
- required = false,
defaultValue = "/hbase",
help = "hbase system configuration about zookeeper.znode.parent value")
private String zookeeperZnodeParent = "/hbase";
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
index 3027e34..378ac4e 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.io.hbase.sink;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
@@ -77,7 +77,7 @@ public class HbaseGenericRecordSink extends HbaseAbstractSink<GenericRecord> {
} else if (value instanceof Float) {
return FloatSchema.of().encode((Float) value);
} else if (value instanceof Boolean) {
- return Bytes.toBytes((Boolean) value);
+ return BooleanSchema.of().encode((Boolean) value);
} else if (value instanceof String) {
return StringSchema.utf8().encode((String) value);
} else if (value instanceof Short) {
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
index b2e2534..248028b 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.io.hbase.HbaseAbstractConfig;
import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -42,7 +41,7 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = false)
@ToString
@Accessors(chain = true)
-public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable {
+public class HbaseSinkConfig extends HbaseAbstractConfig {
private static final long serialVersionUID = 1245636479605735555L;
@@ -65,13 +64,11 @@ public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable
private List<String> qualifierNames;
@FieldDoc(
- required = false,
defaultValue = "1000l",
help = "The hbase operation time in milliseconds")
private long batchTimeMs = 1000l;
@FieldDoc(
- required = false,
defaultValue = "200",
help = "The batch size of write to the hbase table"
)