You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Iv...@tdameritrade.com on 2021/03/04 15:20:54 UTC

Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on PME

Ilya, unfortunately, I am unable to reproduce this issue in a pet project.

I have face with this issue on Ignite 2.9.1 again when I have brought one of two nodes of a cluster down:

org.apache.ignite.IgniteCheckedException: com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
                at org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7563) [ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:260) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:209) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:160) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3342) [ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:3163) [ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) [ignite-core-2.9.1.jar:2.9.1]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Caused by: java.lang.ClassCastException: com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
                at com.devexperts.tos.riskmonitor.cluster.cache.distributiontracker.IgniteCacheKeysDistributionTracker$LocalCacheListener.onUpdated(IgniteCacheKeysDistributionTracker.java:365) ~[main/:?]
                at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1128) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:954) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:895) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1162) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:512) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:691) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2394) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.finishExchangeOnCoordinator(GridDhtPartitionsExchangeFuture.java:3972) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onAllReceived(GridDhtPartitionsExchangeFuture.java:3687) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.distributedExchange(GridDhtPartitionsExchangeFuture.java:1729) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:943) ~[ignite-core-2.9.1.jar:2.9.1]
                at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3314) ~[ignite-core-2.9.1.jar:2.9.1]
                ... 3 more

Cache:


new CacheConfiguration<K, V>()
    .setSqlSchema("PUBLIC")
    .setCacheMode(CacheMode.PARTITIONED)
    // This means that we won't be able to use ACID compliant transactions
    .setAtomicityMode(CacheAtomicityMode.ATOMIC)
    // Wait only primary nodes to finish write operations, do not wait back up nodes
    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC)
    // The default backup factor
    .setBackups(1)
    .setAffinity(new RendezvousAffinityFunction(true));

    .setName(Cache.RM_ACCOUNT.getName())
    .setKeyConfiguration(new CacheKeyConfiguration(CacheAccountKey.class));



Query initialization:

continuousQuery = new ContinuousQueryWithTransformer<K, Object, CacheKeyWithEventType<K>>();
continuousQuery.setInitialQuery(new ScanQuery<>());
continuousQuery.setAutoUnsubscribe(true);
continuousQuery.setLocalListener(new LocalCacheListener());
continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheKeysDistributionTrackerRmtFilter<>()));
continuousQuery.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
   new CacheKeysDistributionTrackerRmtTransformer<>()));

Remote transformer:


public class CacheKeysDistributionTrackerRmtTransformer<K>
   implements IgniteClosure<CacheEntryEvent<? extends K, ?>, CacheKeyWithEventType<K>>
{
   private static final long serialVersionUID = 0;

   @Override
   public CacheKeyWithEventType<K> apply(CacheEntryEvent<? extends K, ?> event) {
      return new CacheKeyWithEventType<K>(event.getKey(), event.getEventType());
   }
}

Local listener:


private class LocalCacheListener
   implements ContinuousQueryWithTransformer.EventListener<CacheKeyWithEventType<K>>
{
   @Override
   public void onUpdated(Iterable<? extends CacheKeyWithEventType<K>> events) {
      for (CacheKeyWithEventType<K> event : events) {
         if (event.getEventType() == EventType.CREATED) {
            executorService.execute(() -> onCacheKeyCreated(event.getKey()));
         } else if (event.getEventType() == EventType.REMOVED) {
            executorService.execute(() -> onCacheKeyRemoved(event.getKey()));
         }
      }
   }
}


CacheKeyWithEventType:


public class CacheKeyWithEventType<K> implements Externalizable {
   private static final long serialVersionUID = -1446749299783090657L;

   private K key;
   private EventType eventType;

   public CacheKeyWithEventType(K key, EventType eventType) {
      this.key = key;
      this.eventType = eventType;
   }

   public CacheKeyWithEventType() {
   }

   public K getKey() {
      return key;
   }

   public EventType getEventType() {
      return eventType;
   }

   @Override
   public void writeExternal(ObjectOutput out) throws IOException {
      out.writeObject(key);
      out.writeObject(eventType);
   }

   @Override
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
      key = (K) in.readObject();
      eventType = (EventType) in.readObject();
   }
}

Best regards,
Ivan Fedorenkov

From: Ilya Kasnacheev <il...@gmail.com>
Reply-To: "user@ignite.apache.org" <us...@ignite.apache.org>
Date: Thursday, October 8, 2020 at 2:24 PM
To: "user@ignite.apache.org" <us...@ignite.apache.org>
Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on PME

Hello!

You can do either one, I'll take it from there.

Regards,
--
Ilya Kasnacheev


вт, 6 окт. 2020 г. в 17:26, <Iv...@tdameritrade.com>>:
Could you please guide be through the process? Should I create just a simple project anywhere and share it here or I should create a test case in the Ignite project?

From: Ilya Kasnacheev <il...@gmail.com>>
Reply-To: "user@ignite.apache.org<ma...@ignite.apache.org>" <us...@ignite.apache.org>>
Date: Tuesday, October 6, 2020 at 3:44 PM
To: "user@ignite.apache.org<ma...@ignite.apache.org>" <us...@ignite.apache.org>>
Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on PME

Hello!

Do you have a reproducer for this issue? I think I could validate it and create an issue if you do.

Can you please also check if it works or not on e.g. Apache Ignite 2.9RC2?

Regards,
--
Ilya Kasnacheev


вт, 6 окт. 2020 г. в 14:30, <Iv...@tdameritrade.com>>>:
Hi everyone!

I am getting the ClassCastException when a node from my cluster fails over. It looks like the root cause is that nodes are loading some keys from their backups and the CacheContinousQueryHandler is assuming that the entries are already converted by the remote side which is a false expectation.

The stacktrace is:

Caused by: java.lang.ClassCastException: org.apache.ignite.internal.binary.BinaryObjectImpl cannot be cast to java.lang.String

...

        at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1114)

        at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:940)

        at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:881)

        at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1161)

        at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:498)

        at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:687)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2261)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.processFullMessage(GridDhtPartitionsExchangeFuture.java:4375)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.access$1500(GridDhtPartitionsExchangeFuture.java:148)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4054)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4042)

        at org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:399)

        at org.apache.ignite.internal.util.future.GridFutureAdapter.listen(GridFutureAdapter.java:354)

        at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onReceiveFullMessage(GridDhtPartitionsExchangeFuture.java:4042)

        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.processFullPartitionUpdate(GridCachePartitionExchangeManager.java:1886)

        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:429)

        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:416)

        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3667)

        at org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3646)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1142)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:591)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:392)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:318)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:109)

        at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:308)

        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1847)

        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1472)

        at org.apache.ignite.internal.managers.communication.GridIoManager.access$5200(GridIoManager.java:229)

        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1367)

The workaround is:

public abstract class CqWithTransformerLocalListenerAdapter<T>
   implements ContinuousQueryWithTransformer.EventListener<T>
{
   private final Function<BinaryObject, T> localTransformer;

   protected CqWithTransformerLocalListenerAdapter(Function<BinaryObject, T> localTransformer) {
      this.localTransformer = localTransformer;
   }

   /**
    * The same as {@link ContinuousQueryWithTransformer.EventListener#onUpdated(Iterable)},
    * but guarantees that entities are transformed.
    */
   protected abstract void onUpdatedTransformed(Iterable<T> entities);

   @Override
   public final void onUpdated(Iterable entities) {
      onUpdatedTransformed(new IterableAdapter(entities));
   }

   private class IterableAdapter implements Iterable<T> {
      private final Iterable<?> entities;

      public IterableAdapter(Iterable<?> entities) {
         this.entities = entities;
      }

      @NotNull
      @Override
      public Iterator<T> iterator() {
         return new IteratorWrapper(entities.iterator());
      }
   }

   private class IteratorWrapper implements Iterator<T> {
      private final Iterator<?> underlying;

      public IteratorWrapper(Iterator<?> underlying) {
         this.underlying = underlying;
      }

      @Override
      public boolean hasNext() {
         return underlying.hasNext();
      }

      @Override
      public T next() {
         Object o = underlying.next();
         // Sometimes an entity may be in a binary form (see javadoc of the enclosing class).
         if (o instanceof BinaryObject) {
            return localTransformer.apply((BinaryObject) o);
         }
         // The entity has been converted by a remote transformer
         return (T) o;
      }
   }
}

Best regards,
Ivan Fedorenkov

Re: Ignite 2.8.1 ContinuousQueryWithTransformer: ClassCastException on PME

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

Can you please provide a runnable reproducer project? Save me the trouble
of pasting all these snippets and writing the boilerplate.

Regards,
-- 
Ilya Kasnacheev


чт, 4 мар. 2021 г. в 18:21, <Iv...@tdameritrade.com>:

> Ilya, unfortunately, I am unable to reproduce this issue in a pet project.
>
> I have face with this issue on Ignite 2.9.1 again when I have brought one
> of two nodes of a cluster down:
>
> org.apache.ignite.IgniteCheckedException:
> com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to
> com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
>                 at
> org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7563)
> [ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.util.future.GridFutureAdapter.resolve(GridFutureAdapter.java:260)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:209)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:160)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3342)
> [ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:3163)
> [ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
> [ignite-core-2.9.1.jar:2.9.1]
>                 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> Caused by: java.lang.ClassCastException:
> com.devexperts.tos.riskmonitor.domain.RmAccount cannot be cast to
> com.devexperts.tos.riskmonitor.cluster.cache.cq.CacheKeyWithEventType
>                 at
> com.devexperts.tos.riskmonitor.cluster.cache.distributiontracker.IgniteCacheKeysDistributionTracker$LocalCacheListener.onUpdated(IgniteCacheKeysDistributionTracker.java:365)
> ~[main/:?]
>                 at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1128)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:954)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:895)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1162)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:512)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:691)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2394)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.finishExchangeOnCoordinator(GridDhtPartitionsExchangeFuture.java:3972)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onAllReceived(GridDhtPartitionsExchangeFuture.java:3687)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.distributedExchange(GridDhtPartitionsExchangeFuture.java:1729)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:943)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:3314)
> ~[ignite-core-2.9.1.jar:2.9.1]
>                 ... 3 more
>
> Cache:
>
>
> new CacheConfiguration<K, V>()
>     .setSqlSchema("PUBLIC")
>     .setCacheMode(CacheMode.PARTITIONED)
>     // This means that we won't be able to use ACID compliant transactions
>     .setAtomicityMode(CacheAtomicityMode.ATOMIC)
>     // Wait only primary nodes to finish write operations, do not wait
> back up nodes
>
> .setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC)
>     // The default backup factor
>     .setBackups(1)
>     .setAffinity(new RendezvousAffinityFunction(true));
>
>     .setName(Cache.RM_ACCOUNT.getName())
>     .setKeyConfiguration(new CacheKeyConfiguration(CacheAccountKey.class));
>
>
>
> Query initialization:
>
> continuousQuery = new ContinuousQueryWithTransformer<K, Object,
> CacheKeyWithEventType<K>>();
> continuousQuery.setInitialQuery(new ScanQuery<>());
> continuousQuery.setAutoUnsubscribe(true);
> continuousQuery.setLocalListener(new LocalCacheListener());
> continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(new
> CacheKeysDistributionTrackerRmtFilter<>()));
> continuousQuery.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
>    new CacheKeysDistributionTrackerRmtTransformer<>()));
>
> Remote transformer:
>
>
> public class CacheKeysDistributionTrackerRmtTransformer<K>
>    implements IgniteClosure<CacheEntryEvent<? extends K, ?>,
> CacheKeyWithEventType<K>>
> {
>    private static final long serialVersionUID = 0;
>
>    @Override
>    public CacheKeyWithEventType<K> apply(CacheEntryEvent<? extends K, ?>
> event) {
>       return new CacheKeyWithEventType<K>(event.getKey(),
> event.getEventType());
>    }
> }
>
> Local listener:
>
>
> private class LocalCacheListener
>    implements
> ContinuousQueryWithTransformer.EventListener<CacheKeyWithEventType<K>>
> {
>    @Override
>    public void onUpdated(Iterable<? extends CacheKeyWithEventType<K>>
> events) {
>       for (CacheKeyWithEventType<K> event : events) {
>          if (event.getEventType() == EventType.CREATED) {
>             executorService.execute(() ->
> onCacheKeyCreated(event.getKey()));
>          } else if (event.getEventType() == EventType.REMOVED) {
>             executorService.execute(() ->
> onCacheKeyRemoved(event.getKey()));
>          }
>       }
>    }
> }
>
>
> CacheKeyWithEventType:
>
>
> public class CacheKeyWithEventType<K> implements Externalizable {
>    private static final long serialVersionUID = -1446749299783090657L;
>
>    private K key;
>    private EventType eventType;
>
>    public CacheKeyWithEventType(K key, EventType eventType) {
>       this.key = key;
>       this.eventType = eventType;
>    }
>
>    public CacheKeyWithEventType() {
>    }
>
>    public K getKey() {
>       return key;
>    }
>
>    public EventType getEventType() {
>       return eventType;
>    }
>
>    @Override
>    public void writeExternal(ObjectOutput out) throws IOException {
>       out.writeObject(key);
>       out.writeObject(eventType);
>    }
>
>    @Override
>    public void readExternal(ObjectInput in) throws IOException,
> ClassNotFoundException {
>       key = (K) in.readObject();
>       eventType = (EventType) in.readObject();
>    }
> }
>
> Best regards,
> Ivan Fedorenkov
>
> From: Ilya Kasnacheev <il...@gmail.com>
> Reply-To: "user@ignite.apache.org" <us...@ignite.apache.org>
> Date: Thursday, October 8, 2020 at 2:24 PM
> To: "user@ignite.apache.org" <us...@ignite.apache.org>
> Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer:
> ClassCastException on PME
>
> Hello!
>
> You can do either one, I'll take it from there.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> вт, 6 окт. 2020 г. в 17:26, <Ivan.Fedorenkov@tdameritrade.com<mailto:
> Ivan.Fedorenkov@tdameritrade.com>>:
> Could you please guide be through the process? Should I create just a
> simple project anywhere and share it here or I should create a test case in
> the Ignite project?
>
> From: Ilya Kasnacheev <ilya.kasnacheev@gmail.com<mailto:
> ilya.kasnacheev@gmail.com>>
> Reply-To: "user@ignite.apache.org<ma...@ignite.apache.org>" <
> user@ignite.apache.org<ma...@ignite.apache.org>>
> Date: Tuesday, October 6, 2020 at 3:44 PM
> To: "user@ignite.apache.org<ma...@ignite.apache.org>" <
> user@ignite.apache.org<ma...@ignite.apache.org>>
> Subject: Re: Ignite 2.8.1 ContinuousQueryWithTransformer:
> ClassCastException on PME
>
> Hello!
>
> Do you have a reproducer for this issue? I think I could validate it and
> create an issue if you do.
>
> Can you please also check if it works or not on e.g. Apache Ignite 2.9RC2?
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> вт, 6 окт. 2020 г. в 14:30, <Ivan.Fedorenkov@tdameritrade.com<mailto:
> Ivan.Fedorenkov@tdameritrade.com><mailto:Ivan.Fedorenkov@tdameritrade.com
> <ma...@tdameritrade.com>>>:
> Hi everyone!
>
> I am getting the ClassCastException when a node from my cluster fails
> over. It looks like the root cause is that nodes are loading some keys from
> their backups and the CacheContinousQueryHandler is assuming that the
> entries are already converted by the remote side which is a false
> expectation.
>
> The stacktrace is:
>
> Caused by: java.lang.ClassCastException:
> org.apache.ignite.internal.binary.BinaryObjectImpl cannot be cast to
> java.lang.String
>
> ...
>
>         at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyLocalListener(CacheContinuousQueryHandler.java:1114)
>
>         at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback0(CacheContinuousQueryHandler.java:940)
>
>         at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.notifyCallback(CacheContinuousQueryHandler.java:881)
>
>         at
> org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.addBackupNotification(GridContinuousProcessor.java:1161)
>
>         at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$2.flushBackupQueue(CacheContinuousQueryHandler.java:498)
>
>         at
> org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.flushBackupQueue(CacheContinuousQueryManager.java:687)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onDone(GridDhtPartitionsExchangeFuture.java:2261)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.processFullMessage(GridDhtPartitionsExchangeFuture.java:4375)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.access$1500(GridDhtPartitionsExchangeFuture.java:148)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4054)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$4.apply(GridDhtPartitionsExchangeFuture.java:4042)
>
>         at
> org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:399)
>
>         at
> org.apache.ignite.internal.util.future.GridFutureAdapter.listen(GridFutureAdapter.java:354)
>
>         at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onReceiveFullMessage(GridDhtPartitionsExchangeFuture.java:4042)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.processFullPartitionUpdate(GridCachePartitionExchangeManager.java:1886)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:429)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$3.onMessage(GridCachePartitionExchangeManager.java:416)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3667)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$MessageHandler.apply(GridCachePartitionExchangeManager.java:3646)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1142)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:591)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:392)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:318)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:109)
>
>         at
> org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:308)
>
>         at
> org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1847)
>
>         at
> org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1472)
>
>         at
> org.apache.ignite.internal.managers.communication.GridIoManager.access$5200(GridIoManager.java:229)
>
>         at
> org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1367)
>
> The workaround is:
>
> public abstract class CqWithTransformerLocalListenerAdapter<T>
>    implements ContinuousQueryWithTransformer.EventListener<T>
> {
>    private final Function<BinaryObject, T> localTransformer;
>
>    protected CqWithTransformerLocalListenerAdapter(Function<BinaryObject,
> T> localTransformer) {
>       this.localTransformer = localTransformer;
>    }
>
>    /**
>     * The same as {@link
> ContinuousQueryWithTransformer.EventListener#onUpdated(Iterable)},
>     * but guarantees that entities are transformed.
>     */
>    protected abstract void onUpdatedTransformed(Iterable<T> entities);
>
>    @Override
>    public final void onUpdated(Iterable entities) {
>       onUpdatedTransformed(new IterableAdapter(entities));
>    }
>
>    private class IterableAdapter implements Iterable<T> {
>       private final Iterable<?> entities;
>
>       public IterableAdapter(Iterable<?> entities) {
>          this.entities = entities;
>       }
>
>       @NotNull
>       @Override
>       public Iterator<T> iterator() {
>          return new IteratorWrapper(entities.iterator());
>       }
>    }
>
>    private class IteratorWrapper implements Iterator<T> {
>       private final Iterator<?> underlying;
>
>       public IteratorWrapper(Iterator<?> underlying) {
>          this.underlying = underlying;
>       }
>
>       @Override
>       public boolean hasNext() {
>          return underlying.hasNext();
>       }
>
>       @Override
>       public T next() {
>          Object o = underlying.next();
>          // Sometimes an entity may be in a binary form (see javadoc of
> the enclosing class).
>          if (o instanceof BinaryObject) {
>             return localTransformer.apply((BinaryObject) o);
>          }
>          // The entity has been converted by a remote transformer
>          return (T) o;
>       }
>    }
> }
>
> Best regards,
> Ivan Fedorenkov
>