You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/07/30 23:42:00 UTC

[GitHub] [pulsar] devinbost opened a new issue #4857: Schemas

devinbost opened a new issue #4857: Schemas 
URL: https://github.com/apache/pulsar/issues/4857
 
 
   **Describe the bug**
   
   We are getting the error: `Trying to subscribe with incompatible schema`
   even though:
   `isSchemaValidationEnforced=false `
   is set on the broker after deploying a new Pulsar Source and simple passthrough Pulsar Function. Here is the more detailed exception:
   
   > java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Trying to subscribe with incompatible schema
   > 	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_212]
   > 	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_212]
   > 	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:700) ~[?:1.8.0_212]
   > 	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687) ~[?:1.8.0_212]
   > 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_212]
   > 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_212]
   > 	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:582) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:159) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
   > Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: Trying to subscribe with incompatible schema
   > 	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:890) ~[java-instance.jar:2.4.0-streamlio-24]
   > 	... 22 more
   
   
   **To Reproduce**
   
   1. Ensure that the schemas from the input topics and output topics have been deleted.
   2. Ensure that the source and function with the information below have been deleted.
   3. Deploy the Pulsar Source with the below configs:
   ```
   {  "type": "source",
       "sourceType": "kafka",
       "runtimeFlags":"-Djava.security.krb5.conf=/pulsar/conf/krb5-dev.conf",
       "destinationTopicName": "persistent://example-tenant/ingest/example-kafka-topic",
       "configs": {
   		"bootstrapServers": "kafka06.example.domain.com:9092,kafka07.example.domain.com:9092,kafka08.example.domain.com:9092",
   		"consumerConfigProperties": {
   			"auto.offset.reset": "latest",
   			"sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true\nuseTicketCache=false serviceName=\"kafka\" principal=\"pulsar_runtime@EXAMPLE.DOMAIN.COM\"\nuseKeyTab=true keyTab=\"/pulsar/conf/auth/pulsar_runtime_dev.keytab\" client=true;",
   			"sasl.kerberos.service.name": "kafka",
   			"security.protocol": "SASL_PLAINTEXT"
   		},
   		"groupId": "pulsar-example-tenant-example-kafka-topic-group",
   		"topic": "example-kafka-topic"
   	},
       "namespace": "ingest",
       "name": "kafka-example-kafka-topic-source",
       "tenant": "example-tenant"
   }
   ```
   4. Deploy the Pulsar function with the configs below:
   ```
   {
       "type": "function",
       "tenant": "example-tenant",
       "namespace": "ingest",
       "name": "example-kafka-topic-SimplePassthroughFunction",
       "artifactPathOrUrl": "http://repository.domain.com/path/to/jar-build-129875312.jar",
       "className": "com.path.to.pulsar.functions.SimplePassthroughFunction",
       "inputs": [
           "persistent://example-tenant/ingest/example-kafka-topic"
       ],
       "output": "persistent://feeds/example-namespace/example-kafka-topic",
       "logTopic": "persistent://example-tenant/ingest/function-logs"
   }
   ```
   
   The same problem occurs if the tenant and namespace are all set to public/default.
   
   All of our functions and sources just take a string and return a string, so there's nothing complex in terms of the schemas.
   
   **Expected behavior**
   We expect the Pulsar Source to collect data from our Kafka topic without error. We also expect the function to process data from our Pulsar Source without error.
   
   **Screenshots**
   
   We've noticed that when we deploy the passthrough function and start querying its status, we initially notice an error that states "UNAVAILABLE: io exception," like this:
   
   ```
   $ pulsar-admin functions status --fqfn public/default/example-kafka-topic-SimplePassthroughFunction
   {
     "numInstances" : 1,
     "numRunning" : 1,
     "instances" : [ {
       "instanceId" : 0,
       "status" : {
         "running" : true,
         "error" : "",
         "numRestarts" : 0,
         "numReceived" : 0,
         "numSuccessfullyProcessed" : 0,
         "numUserExceptions" : 0,
         "latestUserExceptions" : [ ],
         "numSystemExceptions" : 0,
         "latestSystemExceptions" : [ ],
         "averageLatency" : 0.0,
         "lastInvocationTime" : 0,
         "workerId" : "c-pulsar-example.cluster.com-8080"
       }
     } ]
   }
   $ pulsar-admin functions status --fqfn public/default/example-kafka-topic-SimplePassthroughFunction
   {
     "numInstances" : 1,
     "numRunning" : 0,
     "instances" : [ {
       "instanceId" : 0,
       "status" : {
         "running" : false,
         "error" : "UNAVAILABLE: io exception",
         "numRestarts" : 0,
         "numReceived" : 0,
         "numSuccessfullyProcessed" : 0,
         "numUserExceptions" : 0,
         "latestUserExceptions" : [ ],
         "numSystemExceptions" : 0,
         "latestSystemExceptions" : [ ],
         "averageLatency" : 0.0,
         "lastInvocationTime" : 0,
         "workerId" : "c-pulsar-example.cluster.com-8080"
       }
     } ]
   }
   $ pulsar-admin functions status --fqfn public/default/example-kafka-topic-SimplePassthroughFunction
   {
     "numInstances" : 1,
     "numRunning" : 0,
     "instances" : [ {
       "instanceId" : 0,
       "status" : {
         "running" : false,
         "error" : "",
         "numRestarts" : 1,
         "numReceived" : 0,
         "numSuccessfullyProcessed" : 0,
         "numUserExceptions" : 0,
         "latestUserExceptions" : [ ],
         "numSystemExceptions" : 0,
         "latestSystemExceptions" : [ ],
         "averageLatency" : 0.0,
         "lastInvocationTime" : 0,
         "workerId" : "c-pulsar-example.cluster.com-8080"
       }
     } ]
   }
   ```
   
   **Environment:**
    - OS: `docker standalone run -it --net=host -v /etc/pulsar:/pulsar/conf -v /data/provisioning:/pulsar/data  apachepulsar/pulsar-all:2.3.2 /bin/bash`
   - Pulsar version: Using Pulsar 2.3.2.
   
   **Additional context**
   We experienced this error also with a sink. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services