You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/30 15:22:05 UTC

[GitHub] [pulsar] Denovo1998 opened a new issue, #19113: PIP-236: Upload AUTO_CONSUME SchemaType to Broker

Denovo1998 opened a new issue, #19113:
URL: https://github.com/apache/pulsar/issues/19113

   ### Motivation
   
   Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as IncompatibleSchemaException("Topic does not have schema to check").
   https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1037
   https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1147-L1152
   https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3054-L3071
   https://github.com/apache/pulsar/blob/ed33fb399e661e4d47baeaaa8d0cdb3bfadc9546/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1162-L1177
   We should record whether the active consumers of the Topic have one or more consumers whose SchemaType is not AUTO_CONSUME.
   
   ### Goal
   
   1. On the client side, the AUTO_CONSUME schema should be uploaded to the Broker.
   https://github.com/apache/pulsar/blob/36cff70575745f98c7fb4f3a345d21404f5504fc/pulsar-common/src/main/proto/PulsarApi.proto#L25-L55
   add AutoConsume = -3;
   2. On the broker side, record SchemaType in `org.apache.pulsar.broker.service.Consumer`.
   
   ### API Changes
   
   Add SchemaType in Consumer
   ```
           private final SchemaType schemaType;
   ```
   Add AutoConsume = -3; in PulsarApi.proto
   ```
   message Schema {
       enum Type {
           None = 0;
           String = 1;
           Json = 2;
           Protobuf = 3;
           Avro = 4;
           Bool = 5;
           Int8 = 6;
           Int16 = 7;
           Int32 = 8;
           Int64 = 9;
           Float = 10;
           Double = 11;
           Date = 12;
           Time = 13;
           Timestamp = 14;
           KeyValue = 15;
           Instant = 16;
           LocalDate = 17;
           LocalTime = 18;
           LocalDateTime = 19;
           ProtobufNative = 20;
           AutoConsume = -3;
       }
   
       required string name = 1;
       required bytes schema_data = 3;
       required Type type = 4;
       repeated KeyValue properties = 5;
   }
   ```
   In getSchemaType(), AUTO_CONSUM is no longer set to NONE. 
   ```
       private static Schema.Type getSchemaType(SchemaType type) {
           if (type.getValue() < 0 && type.getValue() != -3) {
               return Schema.Type.None;
           } else {
               return Schema.Type.valueOf(type.getValue());
           }
       }
   
       public static SchemaType getSchemaType(Schema.Type type) {
           if (type.getValue() < 0 && type.getValue() != -3) {
               // this is unexpected
               return SchemaType.NONE;
           } else {
               return SchemaType.valueOf(type.getValue());
           }
       }
   ```
   
   ### Implementation
   
   In `org.apache.pulsar.broker.service.ServerCnx#handleSubscribe`
   ```
       SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
               .subscriptionName(subscriptionName)
               .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
               .consumerName(consumerName).isDurable(isDurable)
               .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
               .initialPosition(initialPosition)
               .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
               .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
               .subscriptionProperties(subscriptionProperties)
               .consumerEpoch(consumerEpoch)
               .schemaType(schema == null ? null : schema.getType())
               .build();
       if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
           return topic.addSchemaIfIdleOrCheckCompatible(schema)
                   .thenCompose(v -> topic.subscribe(option));
       } else {
           return topic.subscribe(option);
       }
   ```
   In `org.apache.pulsar.broker.service.persistent.PersistentTopic` and `org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic`. Record SchemaType in `org.apache.pulsar.broker.service.Consumer`.
   ```
       @Override
       public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
           return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
                   option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(),
                   option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
                   option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
                   option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
                   option.getSubscriptionProperties().orElse(Collections.emptyMap()),
                   option.getConsumerEpoch(), option.getSchemaType());
       }
   
       private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
                                                             long consumerId, SubType subType, int priorityLevel,
                                                             String consumerName, boolean isDurable,
                                                             MessageId startMessageId,
                                                             Map<String, String> metadata, boolean readCompacted,
                                                             InitialPosition initialPosition,
                                                             long startMessageRollbackDurationSec,
                                                             boolean replicatedSubscriptionStateArg,
                                                             KeySharedMeta keySharedMeta,
                                                             Map<String, String> subscriptionProperties,
                                                             long consumerEpoch,
                                                             SchemaType schemaType) {
       ......
           CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
               Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
                       consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
                       readCompacted, keySharedMeta, startMessageId, consumerEpoch,
                       schemaType == null ? SchemaType.BYTES : schemaType);
       }
   
       @Override
       public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
                                                    SubType subType, int priorityLevel, String consumerName,
                                                    boolean isDurable, MessageId startMessageId,
                                                    Map<String, String> metadata, boolean readCompacted,
                                                    InitialPosition initialPosition,
                                                    long startMessageRollbackDurationSec,
                                                    boolean replicatedSubscriptionStateArg,
                                                    KeySharedMeta keySharedMeta,
                                                    SchemaType schemaType) {
           return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
                   isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
                   replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, schemaType);
       }
   ```
    The active consumers of the Topic have one or more consumers whose SchemaType is not AUTO_CONSUME. Then `checkSchemaCompatibleForConsumer`
   ```
       @Override
       public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
           return hasSchema().thenCompose((hasSchema) -> {
               int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
                       .mapToInt(subscription -> subscription.getConsumers().stream()
                               .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
                               .toList().size())
                       .sum();
               if (hasSchema
                       || (!producers.isEmpty())
                       || (numActiveConsumersWithoutAutoSchema != 0)
                       || (ledger.getTotalSize() != 0)) {
                   return checkSchemaCompatibleForConsumer(schema);
               } else {
                   return addSchema(schema).thenCompose(schemaVersion ->
                           CompletableFuture.completedFuture(null));
               }
           });
       }
   ```
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   PR: #17449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #19113: PIP-236: Record schema in the request and carry to the broker when subscribing with AUTO_CONSUME schema.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #19113:
URL: https://github.com/apache/pulsar/issues/19113#issuecomment-1436194802

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org