You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/03 16:30:06 UTC

[GitHub] [pulsar] Denovo1998 opened a new pull request, #17449: Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Denovo1998 opened a new pull request, #17449:
URL: https://github.com/apache/pulsar/pull/17449

    Fixes #17354
   
   ### Motivation
   
   *Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as  `IncompatibleSchemaException("Topic does not have schema to check")`.*
   
   ### Modifications
   
   *In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe to the topic. Continuing to create a schema consumer to subscribe to the topic will fail.*
   
   -  When `numActiveConsumers != 0`, and check the schema of the currently existing consumers is AUTO_CONSUME schema.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1264603066

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1052963366


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -140,11 +142,13 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    private SchemaData schemaData;

Review Comment:
   `Schemadata` contains `SchemaType`. I think `Schemadata` is better. HAHA



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298098288

   @codelipenghui @gaoran10 @liangyepianzhou PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092739823


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   Here, `setType` is set to `Schema.type.AutoConsume` because the value of AutoConsume in `Schema.Type` is 21 and the value of AutoConsume in `SchemaType` is -3. It's an act of convert the AutoConsume schema, so when you say "without SchemaType", it doesn't seem to mean AutoConsume schema alone. What do you think of my thoughts here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "eolivelli (via GitHub)" <gi...@apache.org>.
eolivelli commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092109987


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -153,6 +163,10 @@ public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
         return null;
     }
 
+    public SchemaInfo getAutoConsumeSchemaInfo() {

Review Comment:
   why do we need this instance method that returns a static field ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092803388


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -153,6 +163,10 @@ public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
         return null;
     }
 
+    public SchemaInfo getAutoConsumeSchemaInfo() {

Review Comment:
   It's not good to use a non-static method to return a static variable, like the following code:
   
   ```java
   class Foo {
       private static String name = "Foo";
       public String getName() { return name; }
   }
   // Users have to call `foo.getName()` for a Foo instance `foo`, but
   // it's better to add a static method like `Foo.getName()`.
   ```
   
   Back to the code here, it's redundant to add a method to get the schema info. Why not just make `SCHEMA_INFO` public and reference `AutoConsumerSchema.SCHEMA_INFO`?
   
   ```java
       public static final SchemaInfo SCHEMA_INFO = SchemaInfoImpl.builder()
                   .name("AutoConsume")
                   .type(SchemaType.AUTO_CONSUME)
                   .schema(new byte[0])
                   .build();
   ```
   
   ```java
               if (schema instanceof AutoConsumeSchema
                       && Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {
                   si = AutoConsumeSchema.SCHEMA_INFO;
               }
   ```
   
   I don't think it would be less clean than the code in your PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092936216


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   I mean, you can just modify this method:
   
   https://github.com/apache/pulsar/blob/fa6af432ef3d015b371121afd9324ba7f393994d/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L773
   
   There is no need to add a new method to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1364019421

   But when AUTO_CONSUME consumer subscribes to an empty topic first. The schemaInfo is null. There seems to be no way to fix this bug.
   https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L270-L280
   https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817
   @congbobo184 @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1359706982

   We only need to upload `AUTO_CONSUME` and don't change the behavior of `BYTES` schema(don't unload `NONE` AND BYTES schema. in broker schema = null represents the `BYTES` schema). how about this? @Denovo1998 @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1047162934


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1119,14 +1128,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
     @Override
-    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema, String subscriptionName) {
         return hasSchema().thenCompose((hasSchema) -> {
             int numActiveConsumers = subscriptions.values().stream()
                     .mapToInt(subscription -> subscription.getConsumers().size())
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumers != 0 && !subscriptionsOnlyIncludedAutoSchema.containsKey(subscriptionName))

Review Comment:
       In addConsumerSchemaTypeForSubscriptions method, `subscriptionsOnlyIncludedAutoSchema`  saves only the subscription that contains only AUTO Schema Consumers. So we check whether subscriptionsOnlyIncludedAutoSchema contains the current subscription.subscriptionsOnlyIncludedAutoSchema saves only the subscription that contains only AUTO Schema Consumers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1452018150

   That's weird. I ran this test a few times and sometimes it worked and sometimes it failed.
   `org.apache.pulsar.client.api.TopicReaderTest#testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic`
   ```
   
   Expected :true
   Actual   :false
   <Click to see difference>
   
   java.lang.AssertionError: Received duplicate message msg num 6
   	at org.testng.Assert.fail(Assert.java:110)
   	at org.testng.Assert.failNotEquals(Assert.java:1413)
   	at org.testng.Assert.assertTrue(Assert.java:56)
   	at org.apache.pulsar.client.api.TopicReaderTest.testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic(TopicReaderTest.java:1401)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
   	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
   	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
   	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
   	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
   	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
   	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
   	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
   	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
   	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
   	at org.testng.TestRunner.privateRun(TestRunner.java:829)
   	at org.testng.TestRunner.run(TestRunner.java:602)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
   	at org.testng.TestNG.runSuites(TestNG.java:1099)
   	at org.testng.TestNG.run(TestNG.java:1067)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
   ```
               
   I found some issues and pr that might be related.
   https://github.com/apache/pulsar/issues/17407
   https://github.com/apache/pulsar/pull/17443
   https://github.com/apache/pulsar/pull/18202
   
   **Do I need to continue running `run-fail-checks`` for the test to succeed?**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1359016716

   @congbobo184 The conflict has been resolved. What else should i do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058030273


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -771,20 +771,11 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId,
     }
 
     private static Schema.Type getSchemaType(SchemaType type) {
-        if (type.getValue() < 0) {
-            return Schema.Type.None;
-        } else {
-            return Schema.Type.valueOf(type.getValue());
-        }
+        return Schema.Type.valueOf(type.getValue());

Review Comment:
   `BYTES` `AUTO` `AUTO_PUBLISH` still need to return `Schema.Type.None`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -249,13 +250,13 @@ public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean
     }
 
     @Override
-    public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
+    public CompletableFuture<Consumer> subscribe(SubscriptionOption option, SchemaData schemaData) {

Review Comment:
   `SchemaType` add in `SubscriptionOption`, don't change the method parameter



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -811,6 +812,13 @@ public void connectionOpened(final ClientCnx cnx) {
         }
 
         SchemaInfo si = schema.getSchemaInfo();
+        if (si == null && schema instanceof AutoConsumeSchema) {

Review Comment:
   check null is repeated and line-822



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -811,6 +812,13 @@ public void connectionOpened(final ClientCnx cnx) {
         }
 
         SchemaInfo si = schema.getSchemaInfo();
+        if (si == null && schema instanceof AutoConsumeSchema) {
+            si = SchemaInfo.builder()

Review Comment:
   add a singleton in `AutoConsumeSchema`



##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -45,6 +45,9 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        Auto = -2;
+        AutoConsume = -3;
+        AutoPublish = -4;

Review Comment:
   only add `AutoConsume` is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700

   There is still a serious problem with the new design here. If the code [here](https://github.com/apache/pulsar/pull/19128#issuecomment-1371077588) was run against a old version Pulsar standalone, the same error would happen.
   
   ```
   Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Incompatible schema: exists schema type NONE, new schema type AVRO caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Incompatible schema: exists schema type NONE, new schema type AVRO","reqId":2185687386566836163, "remote":"/172.23.174.107:6650", "local":"/172.23.160.1:6033"}
   	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1049)
   	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88)
   	at org.apache.pulsar.client.Main.main(Main.java:55)
   ```
   
   The root cause is here:
   
   ![image](https://user-images.githubusercontent.com/18204803/210598140-a16c731e-e5ee-43b4-ba88-e1100584b01b.png)
   
   The schema info is:
   
   ```java
       static {
           SCHEMA_INFO = SchemaInfoImpl.builder()
                   .name("AutoConsume")
                   .type(SchemaType.AUTO_CONSUME)
                   .schema(new byte[0])
                   .build();
       }
   ```
   
   However, the schema type would be converted into `Schema.Type.None` in https://github.com/apache/pulsar/blob/a6516a8d19896316907c9904d7ed823e9282aef2/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L773-L775
   
   If you modified this method to
   
   ```java
       private static Schema.Type getSchemaType(SchemaType type) {
           return Schema.Type.valueOf(type.getValue());
       }
   ```
   
   A worse case would happen:
   
   > [my-topic] [sub] Error connecting to broker: java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.common.api.proto.Schema$Type.getValue()" because "this.type" is null {}
   
   Because -3 (`SchemaType.AUTO_CONSUME`) cannot be converted to any `Schema.Type` enum defined in PulsarApi.proto: https://github.com/apache/pulsar/blob/a6516a8d19896316907c9904d7ed823e9282aef2/pulsar-common/src/main/proto/PulsarApi.proto#L26
   
   I agree with the comment [here](https://github.com/apache/pulsar/pull/19128#pullrequestreview-1235703083) that we should add an integration test to verify that new clients can connect to old brokers.
   
   Additional, instead of pushing new commits directly without any discussion on the mail list, we should describe the high level design in the PIP issue. /cc @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358825384

   @labuladong When did I say I needed your help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 merged PR #17449:
URL: https://github.com/apache/pulsar/pull/17449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1237766279

   Hi @Denovo1998 , we'd better add test for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1010186051


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1126,7 +1127,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumers != 0 && schema.getType() == SchemaType.AUTO_CONSUME)

Review Comment:
   ```suggestion
                    
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1126,7 +1127,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())

Review Comment:
   ```suggestion
             
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298461536

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17449?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`master@f1df382`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head fe19639 differs from pull request most recent head 350bef1. Consider uploading reports for the commit 350bef1 to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17449/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17449?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #17449   +/-   ##
   =========================================
     Coverage          ?   25.55%           
     Complexity        ?     3345           
   =========================================
     Files             ?      400           
     Lines             ?    43607           
     Branches          ?     4476           
   =========================================
     Hits              ?    11145           
     Misses            ?    30731           
     Partials          ?     1731           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `25.55% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298506639

   > The problem is not caused by the check of the AUTO_CONSUME.
   It is that there are some activeConsumer with AUTO_CONSUME.
   
   Oh, got it.
   
   > IMO, The check is confusing. If there is no hasSchema, then we should addSchema without other conditions.
   
   No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.
   
   I think the current fix will break the existing behavior (If there are active consumers with byte[] schema, the new schema should not be added to the schema registry)
   
   The client side will not upload the AUTO_CONSUME. That should be a challenge for us to know if all the active consumers are using the AUTO_CONSUME schema or byte[] schema.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298563345

   > No, if we have active producers or consumers. The new schema might break the existing producer and consumers because they are using the byte[] schema in this case. The main reason is the client side will not upload the byte[] schema.
   
   @codelipenghui Yes, I have taken this into consideration as well and have given a suggestion:
   If there is a byte[] schema for a topic, then we do not hope it has another schema.
   That is to say, byte[] schema is a valid schema that is exclusive. So we need to store it and mark this topic as `hasSchema`. 
   
   > And store the schema of the byte[]
   > >    if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
   >             // don't set schema for Schema.BYTES
   >             si = null;
   >         }
   > 
   > change it to 
   > >   if (si != null && SchemaType.NONE == si.getType()) {
   >             // don't set schema for Schema.BYTES
   >             si = null;
   >         }
   >  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298588030

   @liangyepianzhou 
   
   I think the code snippet is from the client-side right? 
   Here is a related discussion https://lists.apache.org/thread/3js51tq2p3c3oldfrhprn4kcohx7h1wv


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1059014300


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -140,7 +150,7 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
     @Override
     public SchemaInfo getSchemaInfo() {
         if (!schemaMap.containsKey(SchemaVersion.Latest)) {
-            return null;
+            return SCHEMA_INFO;

Review Comment:
   1.In AutoConsumeSchema, we don't change getSchemaInfo:
   ```
       @Override
       public SchemaInfo getSchemaInfo() {
           if (!schemaMap.containsKey(SchemaVersion.Latest)) {
               return null;
           }
           return schemaMap.get(SchemaVersion.Latest).getSchemaInfo();
       }
   ```
   2.Add a new method in AutoConsumeSchema:
   ```
       public SchemaInfo getAutoConsumeSchemaInfo() {
           return SCHEMA_INFO;
       }
   ```
   3.In ConsumerImpl:
   ```
       SchemaInfo si = schema.getSchemaInfo();
       if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
           // don't set schema for Schema.BYTES
           si = null;
       } else {
           if (schema instanceof AutoConsumeSchema) {
               si = ((AutoConsumeSchema) schema).getAutoConsumeSchemaInfo();
           }
       }
   ```
   Is that OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1367158739

   @Denovo1998 hi, because we change the proto, we need to create a proposal to vote. https://github.com/apache/pulsar/blob/master/wiki/proposals/PIP.md


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1367982364

   @congbobo184 @eolivelli @codelipenghui PTAL!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092809538


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   Please reduce duplicated code between `convertSchema` and `convertAutoConsumeSchema`. Don't just copy and paste. We should reuse the existing logic rather than copy a method to another method and just modify one line.
   
   A simple solution is to modify the `getSchemaType` method like:
   
   ```java
       private static Schema.Type getSchemaType(SchemaType type) {
           if (type.equals(SchemaType.AUTO_CONSUME)) {
               return Schema.Type.AutoConsume;
           } else if (type.getValue() < 0) {
               return Schema.Type.None;
   ```
   
   A more simple solution is just changing the `SchemaType.AUTO_CONSUME` to 21 so that the value could be consistent with what's defined in the `PulsarApi.proto`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092873312


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   The one that accepts a `SchemaType` is private, not public.
   
   Regarding the public one, I agree it's better not to change and keep the code in `ServerCnx` not changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092837594


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1233,6 +1233,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {

Review Comment:
   What do you think of such test cases? I think that's a little more "detailed"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1364643905

   https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/proto/PulsarApi.proto#L47
   
   we should add the AUTO_CONSUME schema type in shi proto, otherwise, the AUTO_CONSUME  can't upload AUTO_CONSUME to broker


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058893613


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -45,6 +45,7 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        AutoConsume = -3;

Review Comment:
   This looks like a wire protocol change (even if it isn't?)
   
   Probably we need further discussion on the mailing list or even a PIP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1367411182

   @congbobo184 @eolivelli Sorry, I can't ask for leave from the company tomorrow, and the company is not allowed to bring my own computer to connect to the Internet. So tomorrow night I will look into how to create a proposal to vote.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1311870001

   > I think we should not check the schema compatibility for AUTO_CONSUME and AUTO_PUBLISH schema
   > 
   > https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1135
   > 
   > I think we should change to
   > 
   > ```java
   > if (schema != null && schema.getType() != SchemaType.AUTO_PUBLISH && schema.getType() != SchemaType.AUTO_CONSUME) {
   >       return topic.addSchemaIfIdleOrCheckCompatible(schema)
   >               .thenCompose(v -> topic.subscribe(option));
   >   } else {
   >       return topic.subscribe(option);
   >   }
   > ```
   > 
   > What do you think @congbobo184 @liangyepianzhou
   
   This doesn't fix the bug that pr is trying to fix. For example, if the schema is AVRO, but two AUTO_CONSUME consumers have subscribed to the topic, the schema compatibility will still be checked. Because numActiveConsumers ! = 0 is true, so the active consumers shouldn't only include the AUTO_CONSUME consumers. Of course we should add your code, too.
   So I made some changes.
   @codelipenghui @liangyepianzhou @congbobo184 PTAL
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1451720069

   @tisonkun Thanks for your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1457432875

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "tisonkun (via GitHub)" <gi...@apache.org>.
tisonkun commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1451710747

   @Denovo1998 I've triggered the CI workflow for you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092815909


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1233,6 +1233,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {

Review Comment:
   Why did you create 4 consumers for each schema type? It's unnecessary and we only need 1 consumer for each schema type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092827614


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   How about we do it this way? 
   ```java
       private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
           convertSchema(schemaInfo, schema);
           schema.setType(Schema.Type.AutoConsume);
       }
   ```
   `getSchemaType` is a public method. And `SchemaType` is also public. It's better not to change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092733328


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -811,6 +812,11 @@ public void connectionOpened(final ClientCnx cnx) {
         if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
             // don't set schema for Schema.BYTES
             si = null;
+        } else {
+            if (schema instanceof AutoConsumeSchema
+                    && Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {

Review Comment:
   Yes, yes, but you can look at it.
   `SchemaInfo si = schema.getSchemaInfo();`
   And
   https://github.com/apache/pulsar/blob/06e4db5c821b2ba9241040f5db52f882e83e8cd8/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java#L140-L146
   So `getAutoConsumeSchemaInfo` is executed when si is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092969708


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   I didn't catch your meaning just now, sorry. It's been fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1050324830


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1119,14 +1128,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
     @Override
-    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema, String subscriptionName) {
         return hasSchema().thenCompose((hasSchema) -> {
             int numActiveConsumers = subscriptions.values().stream()
                     .mapToInt(subscription -> subscription.getConsumers().size())
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumers != 0 && !subscriptionsOnlyIncludedAutoSchema.containsKey(subscriptionName))

Review Comment:
   1. the root cause is, we can't distinguish no schema or AUTO_CONSUME schema, because the client sub command doesn't upload the AUT0_CONSUME type to broker
   2. use a map to check this is very memory consuming, and not necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298220857

   When a consumer with `Schema.AUTO_CONSUME` subscribes to the topic.
   The schema of `CommandSubscribe` is null, so the `schema.getType() == SchemaType.AUTO_CONSUME` always be false.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298502140

   https://github.com/apache/pulsar/blob/adae4ae2b06304eae9ec001357e0bd6ee6ffb053/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3030-L3046
   I think this method only need to check whether have schema, if doesn't hava schema then add the schema, if have shema then check the compatible. so the code like this 
   ```
                   if (hasSchema) {
                       return checkSchemaCompatibleForConsumer(schema);
                   } else {
                       return addSchema(schema).thenCompose(schemaVersion ->
                               CompletableFuture.completedFuture(null));
                   }
   ```
   active producer or active consumer can't affect this method. @codelipenghui @liangyepianzhou 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298181199

   You mean my test shouldn't be put inside SimpleSchemaTest?
   
   
   > Please check the test again. The test SimpleSchemaTest is a test that clean up after class. Your two tests will affect each other.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092813914


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1240,7 +1241,8 @@ private SchemaData getSchema(Schema protocolSchema) {
             .isDeleted(false)
             .timestamp(System.currentTimeMillis())
             .user(Strings.nullToEmpty(originalPrincipal))
-            .type(Commands.getSchemaType(protocolSchema.getType()))
+            .type(protocolSchema.getType() == Schema.Type.AutoConsume
+                    ? SchemaType.AUTO_CONSUME : Commands.getSchemaType(protocolSchema.getType()))

Review Comment:
   In addition to my comment before, we can avoid this change if you just modify the `Commands.getSchemaType` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092903518


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   https://github.com/apache/pulsar/blob/fa6af432ef3d015b371121afd9324ba7f393994d/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java#L178-L180
   If we change the value of `AUTO_CONSUME` in `SchemaType` to 21, the behavior may change in two places: 
   https://github.com/apache/pulsar/blob/fa6af432ef3d015b371121afd9324ba7f393994d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L758-L761
   https://github.com/apache/pulsar/blob/fa6af432ef3d015b371121afd9324ba7f393994d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java#L613-L619
   
   So how about we make that change
   ```java
       private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
           convertSchema(schemaInfo, schema);
           schema.setType(Schema.Type.AutoConsume);
       }
   ```
   And in `ServerCnx`.
   ```java
               .type(protocolSchema.getType() == Schema.Type.AutoConsume
                       ? SchemaType.AUTO_CONSUME : Commands.getSchemaType(protocolSchema.getType()))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092836528


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1233,6 +1233,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {

Review Comment:
   And I want to test whether there is a problem with subscripting with a consumer whose schema is AutoConsume and then subscripting with a consumer whose schema is AutoConsume. Here's a test that looks like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092868889


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1233,6 +1233,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {

Review Comment:
   It makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1371079934

   @eolivelli +1 to me. I just found the approach to carry the schema info will make old version brokers persist the schema info, which could trigger the schema compatibility check for the following producers or consumers, see my comments [here](https://github.com/apache/pulsar/pull/19128#issuecomment-1371077588).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1439366265

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1052966225


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -140,11 +142,13 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    private SchemaData schemaData;

Review Comment:
   If there are other needs we can use SchemaData? it seems that only `SchemaType` is enough for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1364548475

   @congbobo184 There is nothing wrong with my test consumer, but will this affect the behavior of Producer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1365919567

   @congbobo184 Is that all right? Is there anything else need to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1053215124


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -808,7 +811,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
             CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
                         consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
-                        readCompacted, keySharedMeta, startMessageId, consumerEpoch);
+                        readCompacted, keySharedMeta, startMessageId, consumerEpoch,
+                        schemaData == null ? SchemaType.AUTO_CONSUME : schemaData.getType());

Review Comment:
   Null `schemaData` can also represent BYTES schema. After this change, it will not only fix the auto_consume schema issue. It also changed the behavior of the BYTES schema.
   
   Before, if there were consumers with BYTES schema, only the consumer with BYTES schema could subscribe to the topic. With this fix, the consumer with Avro schema can also subscribe to this topic, right?
   
   Looks like we are not able to fix the problem without client-side changes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -142,11 +143,13 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    private SchemaType schemaType;

Review Comment:
   ```suggestion
       private final SchemaType schemaType;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058864674


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -140,7 +150,7 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
     @Override
     public SchemaInfo getSchemaInfo() {
         if (!schemaMap.containsKey(SchemaVersion.Latest)) {
-            return null;
+            return SCHEMA_INFO;

Review Comment:
   you can add a new method in AutoConsumeSchema `getAutoConsumeSchemaInfo()` use this method instead of getSchemaInfo when the schema type is AUTO_CONSUME



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058809109


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1239,6 +1239,92 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer3")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer4")
+                .subscribe();
+        try {
+            log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
+            log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
+            log.info("The autoConsumer3 isConnected: " + autoConsumer3.isConnected());
+            log.info("The autoConsumer4 isConnected: " + autoConsumer4.isConnected());
+            admin.schemas().getSchemaInfo(topic);
+            fail("The schema of topic should not exist");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 404);
+        }
+
+        Consumer<V1Data> consumerWithSchema1 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-1")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema2 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-2")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema3 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("consumerWithSchema-3")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema4 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("consumerWithSchema-4")
+                .subscribe();
+        try {

Review Comment:
   don't need this try-catch, the schema has been uploaded to broker



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1239,6 +1239,92 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())

Review Comment:
   should close or delete 



##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -45,6 +45,7 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        AutoConsume = -3;

Review Comment:
   I am not sure whether we can define -3 as `AutoConsume`, @codelipenghui  could you please take a look that



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1239,6 +1239,92 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer3")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer4")
+                .subscribe();
+        try {
+            log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
+            log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
+            log.info("The autoConsumer3 isConnected: " + autoConsumer3.isConnected());
+            log.info("The autoConsumer4 isConnected: " + autoConsumer4.isConnected());
+            admin.schemas().getSchemaInfo(topic);
+            fail("The schema of topic should not exist");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 404);
+        }
+
+        Consumer<V1Data> consumerWithSchema1 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-1")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema2 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-2")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema3 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))

Review Comment:
   should close or delete 



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1239,6 +1239,92 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer3")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())

Review Comment:
   should close or delete 



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1239,6 +1239,92 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer3")
+                .subscribe();
+
+        Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer4")
+                .subscribe();
+        try {
+            log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
+            log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
+            log.info("The autoConsumer3 isConnected: " + autoConsumer3.isConnected());
+            log.info("The autoConsumer4 isConnected: " + autoConsumer4.isConnected());
+            admin.schemas().getSchemaInfo(topic);
+            fail("The schema of topic should not exist");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 404);
+        }
+
+        Consumer<V1Data> consumerWithSchema1 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-1")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema2 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-2")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema3 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("consumerWithSchema-3")
+                .subscribe();
+        Consumer<V1Data> consumerWithSchema4 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))

Review Comment:
   should close or delete



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -140,7 +150,7 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
     @Override
     public SchemaInfo getSchemaInfo() {
         if (!schemaMap.containsKey(SchemaVersion.Latest)) {
-            return null;
+            return SCHEMA_INFO;

Review Comment:
   this is a public method, so may we don't change this method is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092809337


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -153,6 +163,10 @@ public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
         return null;
     }
 
+    public SchemaInfo getAutoConsumeSchemaInfo() {

Review Comment:
   That is really a very good suggestion. I made a mistake here and I will change it to your design. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1422721748

   This is my first contribution. Maybe now I need more review and approval? Or add a ready-to-test label to the PR? In my own forked repository, all the tests are already passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1423467264

   @liangyepianzhou @liangyepianzhou please help review this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1457531410

   @congbobo184  Now we can merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] labuladong commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
labuladong commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358785876

   I think the better way to resolve this bug is to record `schemaType` in `Consumer` class:
   
   https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L71-L81
   
   Then when we calculate the active consumer num, we can ignore the consumers without schema:
   
   https://github.com/apache/pulsar/blob/a95bb630d76c3234ef4660d722518e642341ad3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1173
   
   I can provide another PR to implement it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] labuladong commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
labuladong commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1362824456

   Hi @Denovo1998! I'm glad that you have this idea too, so I'll close my pr.
   
   For your discussion above, my solution is to only count the consumer which specifies a schema:
   
   ```java
   int numActiveConsumersWithSchema = subscriptions.values().stream()
     .mapToInt(subscription -> (int) subscription.getConsumers().stream()
           // filter out the consumers have specific schema
           .filter((c) -> c.getSchemaType() != null && c.getSchemaType().getValue() > 0).count())
     .sum();
   ```
   
   If so, the original schema won’t be changed, and this code can ignore the consumers with null, AUTO_CONSUME, NONE or BYTES schema, because these schemas are all non-positive enums:
   
   https://github.com/apache/pulsar/blob/90f67587e31cc0cfb27773f85f18d607c9c5c324/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java#L140-L175
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358969000

   @congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1047177308


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1119,14 +1128,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
     @Override
-    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema, String subscriptionName) {
         return hasSchema().thenCompose((hasSchema) -> {
             int numActiveConsumers = subscriptions.values().stream()
                     .mapToInt(subscription -> subscription.getConsumers().size())
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumers != 0 && !subscriptionsOnlyIncludedAutoSchema.containsKey(subscriptionName))

Review Comment:
   In `addConsumerSchemaTypeForSubscriptions`, the subscriptionsOnlyIncludedAutoSchema saves only the subscription that contains only AUTO schema consumers. So we should check whether subscriptionsOnlyIncludedAutoSchema contains the current subscription and numActiveConsumers. If subscriptionsOnlyIncludedAutoSchema save all subscription and its corresponding all schematype consumer, then we can check the no schema consumer count. But I think it's a waste.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1366654165

   @congbobo184 PTAL! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058815858


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -45,6 +45,7 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        AutoConsume = -3;

Review Comment:
   In SchemaType, AUTO_CONSUME = -3. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298212552

   What if the producer writes the data but hasn't uploaded the schema yet?
   https://github.com/apache/pulsar/pull/9853#issuecomment-794646589
   #2669


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1010175401


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1216,6 +1216,52 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())

Review Comment:
   ```suggestion
           Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1216,6 +1216,52 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
+        final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        Consumer autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+
+        Consumer autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())

Review Comment:
   ```suggestion
           Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1345761095

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1045934432


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1113,11 +1113,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                     .subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
                                     .build();
-                            if (schema != null) {
-                                return topic.addSchemaIfIdleOrCheckCompatible(schema)
-                                        .thenCompose(v -> topic.subscribe(option));
+                            if (schema != null && schema.getType() != SchemaType.AUTO_PUBLISH
+                                    && schema.getType() != SchemaType.AUTO_CONSUME) {

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1281683981

   @congbobo184 @mattisonchao PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1238067029

   Already done @Technoboy- 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 closed pull request #17449: Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 closed pull request #17449: Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic
URL: https://github.com/apache/pulsar/pull/17449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1046631457


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -1119,14 +1128,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 
     @Override
-    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
+    public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema, String subscriptionName) {
         return hasSchema().thenCompose((hasSchema) -> {
             int numActiveConsumers = subscriptions.values().stream()
                     .mapToInt(subscription -> subscription.getConsumers().size())
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumers != 0 && !subscriptionsOnlyIncludedAutoSchema.containsKey(subscriptionName))

Review Comment:
   its better to check no schema consumer count and numActiveConsumers. if they have the same number, we add the schema directly, if not same then `checkSchemaCompatibleForConsumer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1045395835


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -1113,11 +1113,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                     .subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
                                     .build();
-                            if (schema != null) {
-                                return topic.addSchemaIfIdleOrCheckCompatible(schema)
-                                        .thenCompose(v -> topic.subscribe(option));
+                            if (schema != null && schema.getType() != SchemaType.AUTO_PUBLISH
+                                    && schema.getType() != SchemaType.AUTO_CONSUME) {

Review Comment:
   if the client consumer sub with Schema.Type AUTO_CONSUME, the sub command arrived at the broker, the schema is null. we don't need to check this, because schema.getType() != SchemaType.AUTO_CONSUME always true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092134058


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -153,6 +163,10 @@ public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
         return null;
     }
 
+    public SchemaInfo getAutoConsumeSchemaInfo() {

Review Comment:
   You can of course create a SchemaInfo here.
   https://github.com/apache/pulsar/pull/17449/files#r1061103612
   But I feel like giving `AutoConsumeSchema` a "default" schemaInfo. Using it in `ConsumerImpl` will make the code a little cleaner. What do you think of my thoughts here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1410572932

   @BewareMyPower @codelipenghui @congbobo184 I just saw the results of the vote. Is it time to review my code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092834406


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java:
##########
@@ -1233,6 +1233,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {

Review Comment:
   Previous old design I needed to test the case of multiple consumers with different subscriptions to subscribe a topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058865963


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -45,6 +45,7 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        AutoConsume = -3;

Review Comment:
   yes, I know. I just don't know if it is possible to pass negative numbers, I communicated with penghui, you can do this in this way 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358828947

   I have resolve this bug by record `SchemaData` in Consumer last night.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358859323

   @congbobo184 PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1052967760


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -140,11 +142,13 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    private SchemaData schemaData;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1061639462


##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -51,6 +51,7 @@ message Schema {
     required bytes schema_data = 3;
     required Type type = 4;
     repeated KeyValue properties = 5;
+    optional bool is_auto_consume_schema = 6 [default = false];

Review Comment:
   I think it's better to add the field to `CommandSubscribe` instead of the `Schema` itself.
   
   BTW, please modify your PIP first before pushing the commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1448392485

   @congbobo184 I'm done. But in this pr I see 2 workflows awaiting approval.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1451809132

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1451925013

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1092678157


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -811,6 +812,11 @@ public void connectionOpened(final ClientCnx cnx) {
         if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
             // don't set schema for Schema.BYTES
             si = null;
+        } else {
+            if (schema instanceof AutoConsumeSchema
+                    && Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {

Review Comment:
   if the autoConsumeSchema has the `SchemaVersion.Latest` schema, maybe we can't set the `si = ((AutoConsumeSchema) schema).getAutoConsumeSchemaInfo()`,



##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java:
##########
@@ -801,6 +805,20 @@ private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
         });
     }
 
+    private static void convertAutoConsumeSchema(SchemaInfo schemaInfo, Schema schema) {
+        schema.setName(schemaInfo.getName())
+                .setSchemaData(schemaInfo.getSchema())
+                .setType(Schema.Type.AutoConsume);

Review Comment:
   repeat with 'convertSchema(SchemaInfo schemaInfo, Schema schema)' except this line, so can add a method `private convertSchemaWithoutSchemaType(SchemaInfo schemaInfo, Schema schema)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1052957906


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java:
##########
@@ -140,11 +142,13 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    private SchemaData schemaData;

Review Comment:
   good way. but maybe we only use the SchemaType is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1358983890

   > @congbobo184 The Commit of my PR is a bit dirty. Should I open a PR and close this?
   
   don't need push a new pr, please merge the apache/master to resolve the conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1053311999


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -808,7 +811,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
             CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
                         consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
-                        readCompacted, keySharedMeta, startMessageId, consumerEpoch);
+                        readCompacted, keySharedMeta, startMessageId, consumerEpoch,
+                        schemaData == null ? SchemaType.AUTO_CONSUME : schemaData.getType());

Review Comment:
   it seems so, if one consumer with BYTES schema sub the topic first, the second consumer with AVRO schema can also sub the topic. If the broker can't distinguish the BYTES schema and AUTO_CONSUME, this problem can't be solved. It can only be distinguished by sub with the Schema type from the client, there is no other way
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1359611281

   @codelipenghui @congbobo184 
   1. https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817
   change here to: 
   SchemaInfo si = schema.getSchemaInfo();
   if (si != null && SchemaType.NONE == si.getType()) {
       // don't set schema for Schema.BYTES
       si = null;
   }
   But here it will change BYTE to NONE: 
   https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L641-L651
   2. Then: 
   https://github.com/Denovo1998/pulsar/blob/33c838af4d4a70e40e6782d37579c67e3f338237/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1133-L1138
   in handleSubscribe(): 
   if (schema != null && schema.getType() != SchemaType.NONE) {
       return topic.addSchemaIfIdleOrCheckCompatible(schema)
               .thenCompose(v -> topic.subscribe(option, schema));
   } else {
       return topic.subscribe(option, schema);
   }
   
   **Is this OK?**
   
   3. And: 
   https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1023
   https://github.com/apache/pulsar/blob/22866bd19c231e85ddff4acee4dad1f895cbbc72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1209-L1222
   Does the other two `handleProducer()` and `handleGetOrCreateSchema()` need to be changed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1364495703

   we need to send the AUTO_CONSUME type to the broker
   https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L773-L788
   
   https://github.com/apache/pulsar/blob/d8569cd4ec6da14f8b2b9338db1ed2f6a3eacf0a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L813-L817
   
   change AUTO_CONSUME type can be send to broker


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1058842583


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java:
##########
@@ -140,7 +150,7 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
     @Override
     public SchemaInfo getSchemaInfo() {
         if (!schemaMap.containsKey(SchemaVersion.Latest)) {
-            return null;
+            return SCHEMA_INFO;

Review Comment:
   So here it looks like this. If AUTO_CONSUME consumer is used to subscribe to a topic, and the topic has no schema, the schemaMap in AutoConsumeSchema will have no key for SchemaVersion.Latest. Then do getSchemaInfo, schemainfo is null. There is no way to upload the AUTO_CONSUNE to the broker. So in the previous commit, if (si == null && schema instanceof AutoConsumeSchema), I create a schemainfo with a schemaType of AUTO_CONSUME. What am I supposed to do here? Do not change getSchemaInfo, still use the previous code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1367983559

   Should I change the title of PR to `PIP-236: Upload AUTO_CONSUME SchemaType to Broker`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#discussion_r1061103612


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -814,6 +815,10 @@ public void connectionOpened(final ClientCnx cnx) {
         if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
             // don't set schema for Schema.BYTES
             si = null;
+        } else {
+            if (schema instanceof AutoConsumeSchema) {
+                si = ((AutoConsumeSchema) schema).getAutoConsumeSchemaInfo();

Review Comment:
   Instead of adding a `getAutoConsumerSchemaInfo` method, I think we can just create the `SchemaInfo` here.
   
   ```java
               si = SchemaInfo.builder()
                       .name("AutoConsume")
                       .type(SchemaType.AUTO_CONSUME)
                       .schema(new byte[0])
                       .build();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on pull request #17449: [fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1298425453

   The problem is not caused by the check of the AUTO_CONSUME.
   It is that there are some `activeConsumer` with AUTO_CONSUME.
   And then the new consumer with the new other schemas will fail at `addSchemaIfIdleOrCheckCompatible` with the exception `IncompatibleSchemaException("Topic does not have schema to check").
   `
   IMO, The check is confusing. If there is no hasSchema, then we should `addSchema` without other conditions.
   https://github.com/apache/pulsar/blob/fe1963988fc6883f52826069a781b91aba0405bf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3037
   >   if (hasSchema
                           || (!producers.isEmpty())
                           || (numActiveConsumers != 0)
                           || (ledger.getTotalSize() != 0)) {
                       return checkSchemaCompatibleForConsumer(schema);
                   } else {
                       return addSchema(schema).thenCompose(schemaVersion ->
                               CompletableFuture.completedFuture(null));
                   }
   
    Why did not we change it to 
    >    if (hasSchema) {
                       return checkSchemaCompatibleForConsumer(schema);
                   } else {
                       return addSchema(schema).thenCompose(schemaVersion ->
                               CompletableFuture.completedFuture(null));
                   }          
        
   And store the schema of the byte[]
   >    if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
               // don't set schema for Schema.BYTES
               si = null;
           }
   
   change it to 
   >   if (si != null && SchemaType.NONE == si.getType()) {
               // don't set schema for Schema.BYTES
               si = null;
           }
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1448025641

   @Denovo1998 hi, can you merge `apache/master` and rerun the test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Denovo1998 commented on pull request #17449: [PIP-236][fix][broker]Fix using schema to create consumer fails after using AUTO_CONSUME consumer to subscribe topic

Posted by "Denovo1998 (via GitHub)" <gi...@apache.org>.
Denovo1998 commented on PR #17449:
URL: https://github.com/apache/pulsar/pull/17449#issuecomment-1447983365

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org