You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/16 15:45:29 UTC

[GitHub] [pulsar] eolivelli opened a new pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

eolivelli opened a new pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248


   ### Motivation
   I saw this problem while developing integration tests for GenericObject + KeyValue with SEPARATED KeyValueEncoding.
   
   I have extracted this patch in order to make it clearer, it was part of #10211 in which I have dropped the tests with SEPARATED KeyValueEncoding due to this problem.
   
   ### Modifications
   
   Before executing MessageImpl#getValue we are now ensuring that the Schema is really loaded when we call getSchemaInfo.
   
   Initially I added an eager loading of the Schema in getSchemaInfo, but this resulted in lots of deadlocks because we are calling getSchemaInfo in many places. 
   The code also expects that AutoConsumeSchema returns getSchemaInfo == null in several places and we have to keep this behaviour.
   
   ### Verifying this change
   
   I am not adding test cases, I will add test cases in a follow up patch, once #10211 lands to master
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-822096567


   @congbobo184 Could you please also help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-821800473


   @congbobo184 @codelipenghui now the patch is good to go. I added the test case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-822395574


   this patch also fixes a problem in the integration tests that made CI more flaky.
   as soon as we commit this patch CI will be in better state
   
   cc @congbobo184 @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-822422238


   thank you @congbobo184 @codelipenghui @lhotari for your review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615345856



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       As soon as the record are processed we exit the loop, usually you make only one cycle, because it is pretty immediate to receive the records on the sink.
   
   BTW I am thinking to rework this test and submit the sink only once, connecting it to multiple topics
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615236889



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -78,27 +80,7 @@ public boolean supportSchemaVersioning() {
 
     @Override
     public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        if (schema == null) {

Review comment:
       I have just moved this code into a separated method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli edited a comment on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli edited a comment on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-821830625


   @linlinnn 
   This is the fix for the problem IllegalRefCount
   https://github.com/apache/pulsar/pull/10215
   
   It is a problem related to the switch to LightProto


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615339228



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       why we need sleep 1 second here, it will make this test cost at least 10 mins.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-822002509


   @linlinnn  @codelipenghui  CI passed please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615236917



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,8 +98,10 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))

Review comment:
       this is the new test case that covers the change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
linlinnn commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-821828414


   the integration test `PulsarGenericObjectSinkTest#testGenericObjectSink` failed frequently, could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-821843561


   also @linlinnn I have pushed now a change that changes that test.
   before this last commit we submitted all of the sinks at once, now we will run only one sink at a time, hoping that this will require less CI resources


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r614942131



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1707,9 +1707,15 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
     @Override
     protected void handleGetSchema(CommandGetSchema commandGetSchema) {
         if (log.isDebugEnabled()) {
-            log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
-                    remoteAddress, new String(commandGetSchema.getSchemaVersion()),
-                    commandGetSchema.getTopic(), commandGetSchema.getRequestId());
+            if (commandGetSchema.hasSchemaVersion()) {
+                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",

Review comment:
       when you enable debug in the broker you see getSchema() does not work due to an error while calling getSchemaVersion() when hasSchemaVersion() returns false.
   I added here the fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#issuecomment-821830625


   This is the fix for the problem
   https://github.com/apache/pulsar/pull/10215
   
   It is a problem related to the switch to LightProto


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615369165



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       Thanks, I see.
   What about moving the check that below `// wait that all sinks processed all records without errors` out of the loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] linlinnn commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
linlinnn commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615369165



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       Thank, I see.
   What about moving the check that below `// wait that all sinks processed all records without errors` out of the loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10248: Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10248:
URL: https://github.com/apache/pulsar/pull/10248#discussion_r615408696



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
##########
@@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
-                new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
-                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
+                new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
+                new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
+                        Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
         );
-        // submit all sinks
+
+        final int numRecords = 2;
+
         for (SinkSpec spec : specs) {
             submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
-        }
-        // check all sinks
-        for (SinkSpec spec : specs) {
+
             // get sink info
             getSinkInfoSuccess(spec.sinkName);
             getSinkStatus(spec.sinkName);
-        }
 
-        final int numRecords = 10;
-
-        for (SinkSpec spec : specs) {
             @Cleanup Producer<Object> producer = client.newProducer(spec.schema)
                     .topic(spec.outputTopicName)
                     .create();
             for (int i = 0; i < numRecords; i++) {
                 MessageId messageId = producer.newMessage()
                         .value(spec.testValue)
                         .property("expectedType", spec.schema.getSchemaInfo().getType().toString())
+                        .property("recordNumber", i + "")
                         .send();
                 log.info("sent message {} {}  with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
             }
-        }
 
-        // wait that all sinks processed all records without errors
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
-            for (SinkSpec spec : specs) {
-                try {
-                    log.info("waiting for sink {}", spec.sinkName);
-                    for (int i = 0; i < 120; i++) {
-                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
-                        log.info("sink {} status {}", spec.sinkName, status);
-                        assertEquals(status.getInstances().size(), 1);
-                        SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
-                        if (instance.getStatus().numWrittenToSink >= numRecords) {
-                            break;
-                        }
-                        assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
-                        Thread.sleep(1000);
-                    }
 
+            // wait that all sinks processed all records without errors
+
+            try {
+                log.info("waiting for sink {}", spec.sinkName);
+                for (int i = 0; i < 120; i++) {
                     SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
                     log.info("sink {} status {}", spec.sinkName, status);
                     assertEquals(status.getInstances().size(), 1);
-                    assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
-                    assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
-                    assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
-                    log.info("sink {} is okay", spec.sinkName);
-                } finally {
-                    dumpSinkLogs(spec);
+                    SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
+                    if (instance.getStatus().numWrittenToSink >= numRecords) {
+                        break;
+                    }
+                    assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
+                    Thread.sleep(1000);

Review comment:
       I have reworked the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org