You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/02/26 12:47:07 UTC

[GitHub] [skywalking] yaojingguo opened a new issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

yaojingguo opened a new issue #6456:
URL: https://github.com/apache/skywalking/issues/6456


   Please answer these questions before submitting your issue.
   
   - Why do you submit this issue?
   - [ ] Question or discussion
   - [x] Bug
   - [ ] Requirement
   - [ ] Feature or performance improvement
   ___
   ### Bug
   - Which version of SkyWalking, OS, and JRE?
   SkyWalking:v8.4.0, OS: CentOS Linux release 7.4.1708 (Core), JRE: 1.8.0_191, spring-kafka 2.2.6.RELEASE
   
   - Which company or project?
   
   - What happened?
   My code uses KafkaTemplate to send messages to Kafka. The following usages of KafkaTemplate  are used to send messages.
   
   ```java
   // Without callback
   template.send(topic, msg);
   
   // With callback
   template.execute(
       (Producer<String, String> producer) -> {
         producer.send(
             new ProducerRecord<>(topic, msg),
             (RecordMetadata metadata, Exception e) -> {
               if (e != null) {
                 System.out.println("message sending failed");
               } else {
                 System.out.println("message sending succeeded");
               }
             });
         return 0;
       });
   ```
   
   agent/logs/skywalking-api.log has the following exception:
   
   ```
   ERROR 2021-02-25 20:36:23:117 kafka-producer-network-thread | producer-2 InstMethodsInter : class[class org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback] before method[onCompletion] intercept failure
   java.lang.ClassCastException: org.apache.skywalking.apm.plugin.kafka.CallbackAdapterInterceptor cannot be cast to org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
   at org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.getSnapshot(CallbackInterceptor.java:83)
   at org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor.beforeMethod(CallbackInterceptor.java:44)
   at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:76)
   at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java)
   at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
   at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
   at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:599)
   at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:575)
   at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485)
   at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
   at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700)
   at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
   at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
   at java.lang.Thread.run(Thread.java:748)
   ```
   
   ___
   ### Possible fix
   
   My guess is that the following branch of `KafkaProducerInterceptor.beforeMethod`' executed:
   
   ```java
               } else if (shouldCallbackInstance instanceof Callback) {
                   Callback callback = (Callback) shouldCallbackInstance;
                   ContextSnapshot snapshot = ContextManager.capture();
                   if (null != snapshot) {
                       CallbackCache cache = new CallbackCache();
                       cache.setSnapshot(snapshot);
                       cache.setCallback(callback);
                       allArguments[1] = new CallbackAdapterInterceptor(cache);
                   }
               }
   ```
   
   So when `CallbackInterceptor.getSnapshot` tried to cast cache.getCallback() to EnhancedInstance, a ClassCastException was thrown. One possible fix is to add a type check to CallbackConstructorInterceptor.onConstruct.
   
   ```java
       @Override
       public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
           Callback callback = (Callback) allArguments[0];
           if (callback instanceof EnhancedInstance) {
               CallbackCache cache;
               if (null != objInst.getSkyWalkingDynamicField()) {
                   cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
               } else {
                   cache = new CallbackCache();
               }
               cache.setCallback(callback);
               objInst.setSkyWalkingDynamicField(cache);
           }
       }
   ```
   
   
   If some committers can give me some guidance, I will try to fix this issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] yaojingguo commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
yaojingguo commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-787060246


   Could you please give me some help on my above question about `getSkyWalkingDynamicField `?
   
   @wu-sheng 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] zifeihan commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
zifeihan commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-787394620


   > I have one question about the following code:
   > 
   > ```java
   > public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {
   > 
   >     @Override
   >     public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
   >         Callback callback = (Callback) allArguments[0];
   >         CallbackCache cache;
   >         if (null != objInst.getSkyWalkingDynamicField()) {
   >             cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
   >         } else {
   >             cache = new CallbackCache();
   >         }
   >         cache.setCallback(callback);
   >         objInst.setSkyWalkingDynamicField(cache);
   >     }
   > }
   > ```
   > 
   > In `KafkaTemplateCallbackInstrumentation`, `CallbackConstructorInterceptor` is registerred as constructor interceptor for `org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback`. Since `onConstruct` is called after `KafkaProducer$InterceptorCallback`'s origin constructor invocation, I think that `objInst.getSkyWalkingDynamicField()` always return `null`.
   > 
   > Under what conditions `objInst.getSkyWalkingDynamicField()` is not `null`?
   
   @yaojingguo Regarding this code, I agree with you. I think here, objInst.getSkyWalkingDynamicField() is always null. Do you have an environment to simulate it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] yaojingguo commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
yaojingguo commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-786999002


   I have one question about the following code:
   
   ```java
   public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {
   
       @Override
       public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
           Callback callback = (Callback) allArguments[0];
           CallbackCache cache;
           if (null != objInst.getSkyWalkingDynamicField()) {
               cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
           } else {
               cache = new CallbackCache();
           }
           cache.setCallback(callback);
           objInst.setSkyWalkingDynamicField(cache);
       }
   }
   ```
   In `KafkaTemplateCallbackInstrumentation`, `CallbackConstructorInterceptor` is registerred as constructor interceptor for `org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback`. Since `onConstruct` is called after `KafkaProducer$InterceptorCallback`'s origin constructor invocation, I think that `objInst.getSkyWalkingDynamicField()` always return `null`. 
   
   Under what conditions `objInst.getSkyWalkingDynamicField()` is not `null`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng closed issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
wu-sheng closed issue #6456:
URL: https://github.com/apache/skywalking/issues/6456


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] yaojingguo commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
yaojingguo commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-787457244


   > Do you have an environment to simulate it?
   
   I can set up an environment. If you can give me the details of the simulation, I am willing to do it.
   
   @zifeihan 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-786633218


   Hi, I don't read all of Kafka producer codes, but you should pay attention about why `CallbackAdapterInterceptor` has been used as a parameter of `KafkaProducerInterceptor`. 
   This adapter was added through #4826 due to the Lambda callback, but your callback fails again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] zifeihan commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
zifeihan commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-786747586


   @yaojingguo thanks for report this issue, I may not have noticed that org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback wraps the callback of the doSend method. Therefore, Lambda callback caused the problem, I suggest modifying CallbackAdapterInterceptor to implement EnhancedInstance, and return callbackCache.getSnapshot() in getSkyWalkingDynamicField. like this,
   ```
   /**
    * implements Callback and EnhancedInstance, for kafka callback in lambda expression
    */
   public class CallbackAdapterInterceptor implements Callback, EnhancedInstance {
   
       /**
        * user Callback object
        */
       private CallbackCache callbackCache;
   
       public CallbackAdapterInterceptor(CallbackCache callbackCache) {
           this.callbackCache = callbackCache;
       }
   
       @Override
       public void onCompletion(RecordMetadata metadata, Exception exception) {
           ContextSnapshot snapshot = callbackCache.getSnapshot();
           AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
           SpanLayer.asMQ(activeSpan);
           activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
           if (metadata != null) {
               Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
           }
           ContextManager.continued(snapshot);
   
           try {
               callbackCache.getCallback().onCompletion(metadata, exception);
           } catch (Throwable t) {
               ContextManager.activeSpan().log(t);
               throw t;
           } finally {
               if (exception != null) {
                   ContextManager.activeSpan().log(exception);
               }
               ContextManager.stopSpan();
           }
       }
   
       @Override
       public Object getSkyWalkingDynamicField() {
           return callbackCache.getSnapshot();
       }
   
       @Override
       public void setSkyWalkingDynamicField(final Object value) {
   
       }
   ```
   
   welcome to submit pr for fix this bug, if you don’t have time, I will fix this bug later, @wu-sheng Suggestions are welcome


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] yaojingguo commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
yaojingguo commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-786979510


   @zifeihan I will have a try.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-787060717


   > Could you please give me some help on my above question about `getSkyWalkingDynamicField `?
   > 
   > @wu-sheng
   
   I could only guess, because this plugin is not written by me. Could this call have overloaded constructor? I think @zifeihan know more than I did.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] zifeihan commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
zifeihan commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-787634053


   > ```
   > if (null != objInst.getSkyWalkingDynamicField()) {
   >             cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
   >         }
   > ```
   
   @stalary Sorry to bother you, I see the author of this code is that you. I think this code is redundant. @yaojingguo for test, You can delete the related problematic code and test kafka-scenario locally. If the test passes, please submit pr, and then perform github ci verification.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on issue #6456: ClassCastException from CallbackAdapterInterceptor to EnhancedInstance with spring-kafka

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on issue #6456:
URL: https://github.com/apache/skywalking/issues/6456#issuecomment-786633408


   @zifeihan Could you take a look here?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org