You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/21 11:08:36 UTC

[pulsar] branch master updated: [pulsar-io] hbase sink use BooleanSchema encode Boolean data (#4258)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b868bcd  [pulsar-io] hbase sink use BooleanSchema encode Boolean data (#4258)
b868bcd is described below

commit b868bcd55642099cd64cacd17af01902ff212ce0
Author: wpl <12...@qq.com>
AuthorDate: Tue May 21 19:08:29 2019 +0800

    [pulsar-io] hbase sink use BooleanSchema encode Boolean data (#4258)
    
    ### Motivation
    
    1. use pulsar client BooleanSchema encode Boolean data
    2. remove unused code.
    
    ### Modifications
    
    1. Use  BooleanSchema.encode() replace Bytes.toBytes()
    2. remove  unused code, eg:  required = false,
---
 .../java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java | 12 ++++--------
 .../apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java  |  4 ++--
 .../org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java     |  5 +----
 3 files changed, 7 insertions(+), 14 deletions(-)

diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
index a9f362d..39b72ea 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/HbaseAbstractConfig.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.hbase;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -27,8 +28,6 @@ import lombok.ToString;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
-import java.io.Serializable;
-
 /**
  * Configuration object for all Hbase Sink components.
  */
@@ -40,13 +39,12 @@ import java.io.Serializable;
 @Accessors(chain = true)
 public class HbaseAbstractConfig implements Serializable {
 
-    private static final long serialVersionUID = 6783394446906640112L;
+    private static final long serialVersionUID = -8945930873383593712L;
 
     @FieldDoc(
-        required = false,
-        defaultValue = "",
+        defaultValue = "hbase-site.xml",
         help = "hbase system configuration 'hbase-site.xml' file")
-    private String hbaseConfigResources;
+    private String hbaseConfigResources = "hbase-site.xml";
 
     @FieldDoc(
         required = true,
@@ -55,13 +53,11 @@ public class HbaseAbstractConfig implements Serializable {
     private String zookeeperQuorum;
 
     @FieldDoc(
-        required = false,
         defaultValue = "2181",
         help = "hbase system configuration about hbase.zookeeper.property.clientPort value")
     private String zookeeperClientPort = "2181";
 
     @FieldDoc(
-        required = false,
         defaultValue = "/hbase",
         help = "hbase system configuration about zookeeper.znode.parent value")
     private String zookeeperZnodeParent = "/hbase";
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
index 3027e34..378ac4e 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSink.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.io.hbase.sink;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
 import org.apache.pulsar.client.impl.schema.DoubleSchema;
 import org.apache.pulsar.client.impl.schema.FloatSchema;
 import org.apache.pulsar.client.impl.schema.IntSchema;
@@ -77,7 +77,7 @@ public class HbaseGenericRecordSink extends HbaseAbstractSink<GenericRecord> {
         } else if (value instanceof Float) {
             return FloatSchema.of().encode((Float) value);
         } else if (value instanceof Boolean) {
-            return Bytes.toBytes((Boolean) value);
+            return BooleanSchema.of().encode((Boolean) value);
         } else if (value instanceof String) {
             return StringSchema.utf8().encode((String) value);
         } else if (value instanceof Short) {
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
index b2e2534..248028b 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseSinkConfig.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.io.hbase.HbaseAbstractConfig;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -42,7 +41,7 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = false)
 @ToString
 @Accessors(chain = true)
-public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable {
+public class HbaseSinkConfig extends HbaseAbstractConfig {
 
     private static final long serialVersionUID = 1245636479605735555L;
 
@@ -65,13 +64,11 @@ public class HbaseSinkConfig extends HbaseAbstractConfig implements Serializable
     private List<String> qualifierNames;
 
     @FieldDoc(
-       required = false,
        defaultValue = "1000l",
        help = "The hbase operation time in milliseconds")
     private long batchTimeMs = 1000l;
 
     @FieldDoc(
-        required = false,
         defaultValue = "200",
         help = "The batch size of write to the hbase table"
     )