You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Cong Guo <co...@huawei.com> on 2018/06/01 17:14:51 UTC
ClassCastException When Using CacheEntryProcessor in StreamVisitor
Hi,
I want to use IgniteDataStreamer to handle data updates. Is it possible to use CacheEntryProcessor in StreamVisitor? I write a simple program as follows. It works on a single node, but gets a ClassCastException on two nodes. The two nodes are on two physical machines. I have set peerClassLoadingEnabled to true on both the nodes. How do I use CacheEntryProcessor in StreamVisitor?
The function is like:
private static void streamUpdate(Ignite ignite, IgniteCache<Long, Person> personCache) {
CacheConfiguration<Long, Double> updateCfg = new CacheConfiguration<>("updateCache");
try(IgniteCache<Long, Double> updateCache = ignite.getOrCreateCache(updateCfg)) {
try (IgniteDataStreamer<Long, Double> updateStmr = ignite.dataStreamer(updateCache.getName())) {
updateStmr.receiver(StreamVisitor.from((cache,e) -> {
Long id = e.getKey();
Double newVal = e.getValue();
personCache.<Long, BinaryObject>withKeepBinary().invoke(id,
new CacheEntryProcessor<Long, BinaryObject, Object>() {
public Object process(MutableEntry<Long, BinaryObject> entry, Object...objects) throws EntryProcessorException {
BinaryObjectBuilder bldr = entry.getValue().toBuilder();
double salary = bldr.getField("salary");
bldr.setField("salary", salary+newVal);
entry.setValue(bldr.build());
return null;
}
});
}));
Random generator = new Random();
for(long i=1;i<=EXP_SIZE;i++) {
long rankey = 1+generator.nextInt(EXP_SIZE);
updateStmr.addData(rankey, 10.0);
}
}//end second try
}//end first try
}
Here the Person class is from the ignite example. There is no exception on a single node.
The exception is like:
javax.cache.processor.EntryProcessorException: java.lang.ClassCastException: com.huawei.clusterexperiment.model.Person cannot be cast to org.apache.ignite.binary.BinaryObject
at org.apache.ignite.internal.processors.cache.CacheInvokeResult.get(CacheInvokeResult.java:102)
at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1361)
at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.invoke(IgniteCacheProxyImpl.java:1405)
at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.invoke(GatewayProtectedCacheProxy.java:1362)
at com.huawei.clusterexperiment.Client.lambda$streamUpdate$a02be2b7$1(Client.java:310)
at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:50)
at org.apache.ignite.stream.StreamVisitor$1.apply(StreamVisitor.java:48)
at org.apache.ignite.stream.StreamVisitor.receive(StreamVisitor.java:38)
at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:397)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:302)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:59)
at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:89)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
at java.lang.Thread.run(Thread.java:745)