You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/03/12 06:15:50 UTC

[GitHub] [pulsar] ambition119 commented on a change in pull request #3695: Hbase Connector Integration Test

ambition119 commented on a change in pull request #3695: Hbase Connector Integration Test
URL: https://github.com/apache/pulsar/pull/3695#discussion_r264533766
 
 

 ##########
 File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 ##########
 @@ -332,41 +347,73 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
 
             kvs.put(key, value);
             producer.newMessage()
-                .key(key)
-                .value(obj)
-                .send();
+                  .key(key)
+                  .value(obj)
+                  .send();
         }
         return kvs;
     }
 
+    // This for HbaseSinkTester
+    protected Map<String, String> produceSchemaMessagesToInputTopicForHbaseSink(String inputTopicName,
+                                                                                Schema<HbaseSinkTester.Foo> schema) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+              .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+              .build();
+        @Cleanup
+        Producer<HbaseSinkTester.Foo> producer = client.newProducer(schema)
+              .topic(inputTopicName)
+              .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+
+
+        String key = "row_key";
+        HbaseSinkTester.Foo obj = new HbaseSinkTester.Foo();
+        obj.setRowKey("rowKey_value");
+        obj.setName("name_value");
+        obj.setAddress("address_value");
+        obj.setAge(30);
+        obj.setFlag(true);
+        String value = new String(schema.encode(obj));
+
+        kvs.put(key, value);
+        producer.newMessage()
+              .key(key)
+              .value(obj)
+              .send();
+
+        return kvs;
+    }
+
     protected void deleteSink(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink",
-            "delete",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
+              PulsarCluster.ADMIN_SCRIPT,
+              "sink",
+              "delete",
+              "--tenant", tenant,
+              "--namespace", namespace,
+              "--name", sinkName
         };
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
-            result.getStdout().contains("Deleted successfully"),
-            result.getStdout()
+              result.getStdout().contains("Deleted successfully"),
+              result.getStdout()
         );
         assertTrue(
-            result.getStderr().isEmpty(),
-            result.getStderr()
+              result.getStderr().isEmpty(),
+              result.getStderr()
         );
     }
 
     protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink",
-            "get",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName
+              PulsarCluster.ADMIN_SCRIPT,
 
 Review comment:
   > Seems the original code is 4 bytes aligned, but now it is not :)
   > there are also several other places with this code alignment issue.
   
   I format code lead, fix this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services