You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Cong Guo <co...@huawei.com> on 2018/06/01 17:14:51 UTC

ClassCastException When Using CacheEntryProcessor in StreamVisitor

Hi,

I want to use IgniteDataStreamer to handle data updates. Is it possible to use CacheEntryProcessor in StreamVisitor? I write a simple program as follows. It works on a single node, but gets a ClassCastException on two nodes. The two nodes are on two physical machines. I have set  peerClassLoadingEnabled to true on both the nodes. How do I use CacheEntryProcessor in StreamVisitor?

The function is like:

private static void streamUpdate(Ignite ignite, IgniteCache<Long, Person> personCache) {
                                CacheConfiguration<Long, Double> updateCfg = new CacheConfiguration<>("updateCache");
                                try(IgniteCache<Long, Double> updateCache = ignite.getOrCreateCache(updateCfg)) {
                                                try (IgniteDataStreamer<Long, Double> updateStmr = ignite.dataStreamer(updateCache.getName())) {

                                                                updateStmr.receiver(StreamVisitor.from((cache,e) -> {
                                                                                Long id = e.getKey();
                                                                                Double newVal = e.getValue();
                                                                                personCache.<Long, BinaryObject>withKeepBinary().invoke(id,
                                                                                                new CacheEntryProcessor<Long, BinaryObject, Object>() {
                                                                                                                public Object process(MutableEntry<Long, BinaryObject> entry, Object...objects) throws EntryProcessorException {
                                                                                                                                BinaryObjectBuilder bldr = entry.getValue().toBuilder();
                                                                                                                                double salary = bldr.getField("salary");
                                                                                                                                bldr.setField("salary", salary+newVal);
                                                                                                                                entry.setValue(bldr.build());
                                                                                                                                return null;
                                                                                                                }
                                                                                                });
                                                                }));

                                                                Random generator = new Random();
                                                                for(long i=1;i<=EXP_SIZE;i++) {
                                                                                long rankey = 1+generator.nextInt(EXP_SIZE);
                                                                                updateStmr.addData(rankey, 10.0);
                                                                }
                                                }//end second try
                                }//end first try
}

Here the Person class is from the ignite example. There is no exception on a single node.
The exception is like:

javax.cache.processor.EntryProcessorException: java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person cannot be cast to org.apache.ignite.binary.BinaryObject
        at org.apache.ignite.internal.processors.cache.CacheInvokeResult.get(CacheInvokeResult.java:102)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361)
        at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405)
        at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362)
        at com.huawei.clusterexperiment.Client.lambda$streamUpdate$a02be2b7$1(Client.java:310)
        at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:50)
        at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:48)
        at org.apache.ignite.stream.StreamVisitor.receive(StreamVisitor.java:38)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:397)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:302)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
        at java.lang.Thread.run(Thread.java:745)