You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/12/12 14:45:08 UTC
[3/3] ignite git commit: IGNITE-4033 Streamline platform callback
interface
IGNITE-4033 Streamline platform callback interface
This closes #1261
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72ac53da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72ac53da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72ac53da
Branch: refs/heads/master
Commit: 72ac53da2e6f8311c2d816763a6765724b79491a
Parents: f621f7f
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Dec 12 17:44:48 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Dec 12 17:44:48 2016 +0300
----------------------------------------------------------------------
.../cache/PlatformCacheEntryFilterImpl.java | 4 +-
.../cache/PlatformCacheEntryProcessorImpl.java | 43 +-
.../affinity/PlatformAffinityFunction.java | 52 +-
.../callback/PlatformCallbackGateway.java | 266 ++--
.../platform/callback/PlatformCallbackOp.java | 206 +++
.../callback/PlatformCallbackUtils.java | 544 +-------
.../platform/compute/PlatformAbstractJob.java | 2 +-
.../platform/compute/PlatformAbstractTask.java | 15 +-
.../platform/compute/PlatformClosureJob.java | 12 +-
.../platform/compute/PlatformFullJob.java | 15 +-
.../platform/compute/PlatformFullTask.java | 18 +-
.../PlatformStreamReceiverImpl.java | 3 +
.../dotnet/PlatformDotNetCacheStore.java | 6 +-
.../services/PlatformAbstractService.java | 25 +-
.../platform/utils/PlatformFutureUtils.java | 4 +-
.../platform/utils/PlatformUtils.java | 8 +-
.../cpp/core/src/impl/ignite_environment.cpp | 74 +-
.../platforms/cpp/jni/include/ignite/jni/java.h | 85 +-
modules/platforms/cpp/jni/project/vs/module.def | 4 +-
modules/platforms/cpp/jni/src/java.cpp | 339 +----
.../Services/ServicesTest.cs | 3 +-
.../Apache.Ignite.Core.csproj | 1 +
.../Impl/Binary/BinaryUtils.cs | 16 +
.../Impl/Binary/Io/BinaryStreamBase.cs | 4 +-
.../Impl/Compute/ComputeTaskHolder.cs | 14 +-
.../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 79 +-
.../Impl/Unmanaged/UnmanagedCallbackOp.cs | 86 ++
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 1229 +++++++++---------
28 files changed, 1293 insertions(+), 1864 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
index 4c86d6d..3c55b76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
@@ -60,12 +60,14 @@ public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate impl
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(ptr);
+
writer.writeObject(k);
writer.writeObject(v);
out.synchronize();
- return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
+ return ctx.gateway().cacheEntryFilterApply(mem.pointer()) != 0;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
index 3e8ad61..31dd267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
@@ -17,15 +17,9 @@
package org.apache.ignite.internal.processors.platform.cache;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
@@ -36,6 +30,13 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStrea
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
/**
* Platform cache entry processor. Delegates processing to native platform.
*/
@@ -65,7 +66,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
private transient long ptr;
/**
- * {@link java.io.Externalizable} support.
+ * {@link Externalizable} support.
*/
public PlatformCacheEntryProcessorImpl() {
// No-op.
@@ -86,7 +87,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
@Override public Object process(MutableEntry entry, Object... args)
throws EntryProcessorException {
try {
- IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+ Ignite ignite = (Ignite)entry.unwrap(Ignite.class);
PlatformProcessor interopProc;
@@ -112,12 +113,10 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
* @param ctx Context.
* @param entry Entry.
* @return Processing result.
- * @throws org.apache.ignite.IgniteCheckedException
*/
- private Object execute0(PlatformContext ctx, MutableEntry entry)
- throws IgniteCheckedException {
- try (PlatformMemory outMem = ctx.memory().allocate()) {
- PlatformOutputStream out = outMem.output();
+ private Object execute0(PlatformContext ctx, MutableEntry entry) {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
@@ -125,17 +124,15 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
out.synchronize();
- try (PlatformMemory inMem = ctx.memory().allocate()) {
- PlatformInputStream in = inMem.input();
+ ctx.gateway().cacheInvoke(mem.pointer());
- ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+ PlatformInputStream in = mem.input();
- in.synchronize();
+ in.synchronize();
- BinaryRawReaderEx reader = ctx.reader(in);
+ BinaryRawReaderEx reader = ctx.reader(in);
- return readResultAndUpdateEntry(ctx, entry, reader);
- }
+ return readResultAndUpdateEntry(ctx, entry, reader);
}
}
@@ -145,7 +142,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
* @param entry Entry to process.
* @param writer Writer.
*/
- private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriterEx writer) {
+ private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriter writer) {
writer.writeObject(entry.getKey());
writer.writeObject(entry.getValue());
@@ -167,7 +164,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
* @param entry Mutable entry to update.
* @param reader Reader.
* @return Entry processing result
- * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+ * @throws EntryProcessorException If processing has failed in user code.
*/
@SuppressWarnings("unchecked")
private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, BinaryRawReaderEx reader) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
index 2d3cada..4206d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -166,11 +166,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(ptr);
writer.writeObject(key);
out.synchronize();
- return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer());
+ return ctx.gateway().affinityFunctionPartition(mem.pointer());
}
}
@@ -186,34 +187,34 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
assert ptr != 0;
assert affCtx != null;
- try (PlatformMemory outMem = ctx.memory().allocate()) {
- try (PlatformMemory inMem = ctx.memory().allocate()) {
- PlatformOutputStream out = outMem.output();
- BinaryRawWriterEx writer = ctx.writer(out);
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeLong(ptr);
- // Write previous assignment
- PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
+ // Write previous assignment
+ PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
- out.synchronize();
+ out.synchronize();
+
+ // Call platform
+ // We can not restore original AffinityFunctionContext after the call to platform,
+ // due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
+ // Secondly, AffinityFunctionContext can't be changed by the user.
+ if (baseTarget != null)
+ baseTarget.setCurrentAffinityFunctionContext(affCtx);
- // Call platform
- // We can not restore original AffinityFunctionContext after the call to platform,
- // due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
- // Secondly, AffinityFunctionContext can't be changed by the user.
+ try {
+ ctx.gateway().affinityFunctionAssignPartitions(mem.pointer());
+ }
+ finally {
if (baseTarget != null)
- baseTarget.setCurrentAffinityFunctionContext(affCtx);
-
- try {
- ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer());
- }
- finally {
- if (baseTarget != null)
- baseTarget.setCurrentAffinityFunctionContext(null);
- }
-
- // Read result
- return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx);
+ baseTarget.setCurrentAffinityFunctionContext(null);
}
+
+ // Read result
+ return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(mem), ctx);
}
}
@@ -234,11 +235,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(ptr);
writer.writeUuid(nodeId);
out.synchronize();
- ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer());
+ ctx.gateway().affinityFunctionRemoveNode(mem.pointer());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index c77f501..a9268fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -61,7 +61,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreCreate, memPtr);
}
finally {
leave();
@@ -69,15 +69,14 @@ public class PlatformCallbackGateway {
}
/**
- * @param objPtr Object pointer.
* @param memPtr Memory pointer.
* @return Result.
*/
- public int cacheStoreInvoke(long objPtr, long memPtr) {
+ public int cacheStoreInvoke(long memPtr) {
enter();
try {
- return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr);
+ return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreInvoke, memPtr);
}
finally {
leave();
@@ -92,7 +91,7 @@ public class PlatformCallbackGateway {
return; // no need to destroy stores on grid stop
try {
- PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreDestroy, objPtr);
}
finally {
leave();
@@ -102,14 +101,13 @@ public class PlatformCallbackGateway {
/**
* Creates cache store session.
*
- * @param storePtr Store instance pointer.
* @return Session instance pointer.
*/
- public long cacheStoreSessionCreate(long storePtr) {
+ public long cacheStoreSessionCreate() {
enter();
try {
- return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreSessionCreate, 0);
}
finally {
leave();
@@ -126,7 +124,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterCreate, memPtr);
}
finally {
leave();
@@ -134,15 +132,14 @@ public class PlatformCallbackGateway {
}
/**
- * @param ptr Pointer.
* @param memPtr Memory pointer.
* @return Result.
*/
- public int cacheEntryFilterApply(long ptr, long memPtr) {
+ public int cacheEntryFilterApply(long memPtr) {
enter();
try {
- return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr);
+ return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterApply, memPtr);
}
finally {
leave();
@@ -156,7 +153,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterDestroy, ptr);
}
finally {
leave();
@@ -166,14 +163,13 @@ public class PlatformCallbackGateway {
/**
* Invoke cache entry processor.
*
- * @param outMemPtr Output memory pointer.
- * @param inMemPtr Input memory pointer.
+ * @param memPtr Memory pointer.
*/
- public void cacheInvoke(long outMemPtr, long inMemPtr) {
+ public void cacheInvoke(long memPtr) {
enter();
try {
- PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheInvoke, memPtr);
}
finally {
leave();
@@ -183,15 +179,13 @@ public class PlatformCallbackGateway {
/**
* Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
*
- * @param taskPtr Task pointer.
- * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
- * @param inMemPtr Input memory pointer.
+ * @param memPtr Memory pointer.
*/
- public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) {
+ public void computeTaskMap(long memPtr) {
enter();
try {
- PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskMap, memPtr);
}
finally {
leave();
@@ -203,14 +197,31 @@ public class PlatformCallbackGateway {
*
* @param taskPtr Task pointer.
* @param jobPtr Job pointer.
- * @param memPtr Memory pointer (always zero for local job execution).
* @return Job result enum ordinal.
*/
- public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) {
+ public int computeTaskLocalJobResult(long taskPtr, long jobPtr) {
+ enter();
+
+ try {
+ return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ComputeTaskLocalJobResult, taskPtr, jobPtr, 0, null);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Perform native task job result notification.
+ *
+ * @param memPtr Memory pointer.
+ * @return Job result enum ordinal.
+ */
+ public int computeTaskJobResult(long memPtr) {
enter();
try {
- return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr);
+ return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskJobResult, memPtr);
}
finally {
leave();
@@ -226,7 +237,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskReduce, taskPtr);
}
finally {
leave();
@@ -243,7 +254,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ComputeTaskComplete, taskPtr, memPtr, 0, null);
}
finally {
leave();
@@ -261,7 +273,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr);
+ return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ComputeJobSerialize, jobPtr, memPtr, 0, null);
}
finally {
leave();
@@ -278,7 +291,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCreate, memPtr);
}
finally {
leave();
@@ -290,13 +303,29 @@ public class PlatformCallbackGateway {
*
* @param jobPtr Job pointer.
* @param cancel Cancel flag.
- * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
*/
- public void computeJobExecute(long jobPtr, int cancel, long memPtr) {
+ public void computeJobExecuteLocal(long jobPtr, long cancel) {
enter();
try {
- PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0, null);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Execute native job on a node other than where it was created.
+ *
+ * @param memPtr Memory pointer.
+ */
+ public void computeJobExecute(long memPtr) {
+ enter();
+
+ try {
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobExecute, memPtr);
}
finally {
leave();
@@ -312,7 +341,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCancel, jobPtr);
}
finally {
leave();
@@ -328,7 +357,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.computeJobDestroy(envPtr, ptr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobDestroy, ptr);
}
finally {
leave();
@@ -338,14 +367,13 @@ public class PlatformCallbackGateway {
/**
* Invoke local callback.
*
- * @param cbPtr Callback pointer.
* @param memPtr Memory pointer.
*/
- public void continuousQueryListenerApply(long cbPtr, long memPtr) {
+ public void continuousQueryListenerApply(long memPtr) {
enter();
try {
- PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryListenerApply, memPtr);
}
finally {
leave();
@@ -362,7 +390,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterCreate, memPtr);
}
finally {
leave();
@@ -372,15 +400,14 @@ public class PlatformCallbackGateway {
/**
* Invoke remote filter.
*
- * @param filterPtr Filter pointer.
* @param memPtr Memory pointer.
* @return Result.
*/
- public int continuousQueryFilterApply(long filterPtr, long memPtr) {
+ public long continuousQueryFilterApply(long memPtr) {
enter();
try {
- return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterApply, memPtr);
}
finally {
leave();
@@ -396,7 +423,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr,
+ PlatformCallbackOp.ContinuousQueryFilterRelease, filterPtr);
}
finally {
leave();
@@ -414,7 +442,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.DataStreamerTopologyUpdate, ptr, topVer, topSize, null);
}
finally {
leave();
@@ -424,16 +453,15 @@ public class PlatformCallbackGateway {
/**
* Invoke stream receiver.
*
- * @param ptr Receiver native pointer.
* @param cache Cache object.
* @param memPtr Stream pointer.
- * @param keepBinary Binary flag.
*/
public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) {
enter();
try {
- PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepBinary);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.DataStreamerStreamReceiverInvoke, memPtr, 0, 0, cache);
}
finally {
leave();
@@ -446,11 +474,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureByteResult(long futPtr, int res) {
+ public void futureByteResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureByteResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -463,11 +492,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureBoolResult(long futPtr, int res) {
+ public void futureBoolResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureBoolResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -480,11 +510,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureShortResult(long futPtr, int res) {
+ public void futureShortResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureShortResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -497,11 +528,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureCharResult(long futPtr, int res) {
+ public void futureCharResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureCharResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -514,11 +546,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureIntResult(long futPtr, int res) {
+ public void futureIntResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureIntResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -531,11 +564,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureFloatResult(long futPtr, float res) {
+ public void futureFloatResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureFloatResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -552,7 +586,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureLongResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -565,11 +600,12 @@ public class PlatformCallbackGateway {
* @param futPtr Future pointer.
* @param res Result.
*/
- public void futureDoubleResult(long futPtr, double res) {
+ public void futureDoubleResult(long futPtr, long res) {
enter();
try {
- PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureDoubleResult, futPtr, res, 0, null);
}
finally {
leave();
@@ -586,7 +622,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureObjectResult, futPtr, memPtr, 0, null);
}
finally {
leave();
@@ -602,7 +639,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.futureNullResult(envPtr, futPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.FutureNullResult, futPtr);
}
finally {
leave();
@@ -619,7 +656,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.FutureError, futPtr, memPtr, 0, null);
}
finally {
leave();
@@ -636,8 +674,9 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr);
- }
+ return PlatformCallbackUtils.inLongOutLong(envPtr,
+ PlatformCallbackOp.MessagingFilterCreate, memPtr);
+ }
finally {
leave();
}
@@ -652,7 +691,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr);
+ return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.MessagingFilterApply, ptr, memPtr, 0, null);
}
finally {
leave();
@@ -665,7 +705,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.MessagingFilterDestroy, ptr);
}
finally {
leave();
@@ -682,7 +722,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterCreate, memPtr);
}
finally {
leave();
@@ -698,7 +738,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr);
+ return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.EventFilterApply, ptr, memPtr, 0, null);
}
finally {
leave();
@@ -712,7 +753,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterDestroy, ptr);
}
finally {
leave();
@@ -728,7 +769,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.nodeInfo(envPtr, memPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.NodeInfo, memPtr);
}
finally {
leave();
@@ -745,7 +786,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.onStart(envPtr, proc, memPtr);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.OnStart, memPtr, 0, 0, proc);
}
finally {
leave();
@@ -762,7 +803,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.LifecycleOnEvent, ptr, evt, 0, null);
}
finally {
leave();
@@ -779,7 +821,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap);
+ PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.MemoryReallocate, memPtr, cap, 0, null);
}
finally {
leave();
@@ -790,13 +833,13 @@ public class PlatformCallbackGateway {
* Initializes native service.
*
* @param memPtr Pointer.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @throws IgniteCheckedException In case of error.
*/
public long serviceInit(long memPtr) throws IgniteCheckedException {
enter();
try {
- return PlatformCallbackUtils.serviceInit(envPtr, memPtr);
+ return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInit, memPtr);
}
finally {
leave();
@@ -806,15 +849,14 @@ public class PlatformCallbackGateway {
/**
* Executes native service.
*
- * @param svcPtr Pointer to the service in the native platform.
* @param memPtr Stream pointer.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @throws IgniteCheckedException In case of error.
*/
- public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException {
+ public void serviceExecute(long memPtr) throws IgniteCheckedException {
enter();
try {
- PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceExecute, memPtr);
}
finally {
leave();
@@ -824,15 +866,14 @@ public class PlatformCallbackGateway {
/**
* Cancels native service.
*
- * @param svcPtr Pointer to the service in the native platform.
* @param memPtr Stream pointer.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @throws IgniteCheckedException In case of error.
*/
- public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException {
+ public void serviceCancel(long memPtr) throws IgniteCheckedException {
enter();
try {
- PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceCancel, memPtr);
}
finally {
leave();
@@ -842,16 +883,14 @@ public class PlatformCallbackGateway {
/**
* Invokes service method.
*
- * @param svcPtr Pointer to the service in the native platform.
- * @param outMemPtr Output memory pointer.
- * @param inMemPtr Input memory pointer.
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @param memPtr Memory pointer.
+ * @throws IgniteCheckedException In case of error.
*/
- public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException {
+ public void serviceInvokeMethod(long memPtr) throws IgniteCheckedException {
enter();
try {
- PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInvokeMethod, memPtr);
}
finally {
leave();
@@ -867,7 +906,7 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr);
+ return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ClusterNodeFilterApply, memPtr);
}
finally {
leave();
@@ -885,7 +924,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.extensionCallbackInLongOutLong(envPtr, typ, arg1);
+ return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ExtensionInLongOutLong, typ, arg1, 0, null);
}
finally {
leave();
@@ -904,7 +944,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.extensionCallbackInLongLongOutLong(envPtr, typ, arg1, arg2);
+ return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+ PlatformCallbackOp.ExtensionInLongLongOutLong, typ, arg1, arg2, null);
}
finally {
leave();
@@ -918,7 +959,7 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.onClientDisconnected(envPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnClientDisconnected, 0);
}
finally {
leave();
@@ -934,7 +975,8 @@ public class PlatformCallbackGateway {
enter();
try {
- PlatformCallbackUtils.onClientReconnected(envPtr, clusterRestarted);
+ PlatformCallbackUtils.inLongOutLong(envPtr,
+ PlatformCallbackOp.OnClientReconnected, clusterRestarted ? 1 : 0);
}
finally {
leave();
@@ -985,7 +1027,7 @@ public class PlatformCallbackGateway {
public void onStop() {
block();
- PlatformCallbackUtils.onStop(envPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnStop, 0);
}
/**
@@ -999,7 +1041,8 @@ public class PlatformCallbackGateway {
enter();
try {
- return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc);
+ return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.AffinityFunctionInit,
+ memPtr, 0, 0, baseFunc);
}
finally {
leave();
@@ -1009,15 +1052,15 @@ public class PlatformCallbackGateway {
/**
* Gets the partition from affinity function.
*
- * @param ptr Affinity function pointer.
- * @param memPtr Pointer to a stream with key object.
+ * @param memPtr Pointer to a stream with data.
* @return Partition number for a given key.
*/
- public int affinityFunctionPartition(long ptr, long memPtr) {
+ public int affinityFunctionPartition(long memPtr) {
enter();
try {
- return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr);
+ return (int)PlatformCallbackUtils.inLongOutLong(envPtr,
+ PlatformCallbackOp.AffinityFunctionPartition, memPtr);
}
finally {
leave();
@@ -1027,15 +1070,13 @@ public class PlatformCallbackGateway {
/**
* Assigns the affinity partitions.
*
- * @param ptr Affinity function pointer.
- * @param outMemPtr Pointer to a stream with affinity context.
- * @param inMemPtr Pointer to a stream with result.
+ * @param memPtr Pointer to a stream.
*/
- public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){
+ public void affinityFunctionAssignPartitions(long memPtr){
enter();
try {
- PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionAssignPartitions, memPtr);
}
finally {
leave();
@@ -1045,14 +1086,13 @@ public class PlatformCallbackGateway {
/**
* Removes the node from affinity function.
*
- * @param ptr Affinity function pointer.
- * @param memPtr Pointer to a stream with node id.
+ * @param memPtr Pointer to a stream.
*/
- public void affinityFunctionRemoveNode(long ptr, long memPtr) {
+ public void affinityFunctionRemoveNode(long memPtr) {
enter();
try {
- PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionRemoveNode, memPtr);
}
finally {
leave();
@@ -1069,7 +1109,7 @@ public class PlatformCallbackGateway {
return; // skip: destroy is not necessary during shutdown.
try {
- PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr);
+ PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionDestroy, ptr);
}
finally {
leave();
@@ -1097,7 +1137,7 @@ public class PlatformCallbackGateway {
/**
* Enter gateway.
*/
- protected boolean tryEnter() {
+ private boolean tryEnter() {
return lock.enterBusy();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
new file mode 100644
index 0000000..973ba51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License; Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing; software
+ * distributed under the License is distributed on an "AS IS" BASIS;
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND; either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.callback;
+
+/**
+ * Platform callback operation codes.
+ */
+class PlatformCallbackOp {
+ /** */
+ public static final int CacheStoreCreate = 1;
+
+ /** */
+ public static final int CacheStoreInvoke = 2;
+
+ /** */
+ public static final int CacheStoreDestroy = 3;
+
+ /** */
+ public static final int CacheStoreSessionCreate = 4;
+
+ /** */
+ public static final int CacheEntryFilterCreate = 5;
+
+ /** */
+ public static final int CacheEntryFilterApply = 6;
+
+ /** */
+ public static final int CacheEntryFilterDestroy = 7;
+
+ /** */
+ public static final int CacheInvoke = 8;
+
+ /** */
+ public static final int ComputeTaskMap = 9;
+
+ /** */
+ public static final int ComputeTaskJobResult = 10;
+
+ /** */
+ public static final int ComputeTaskReduce = 11;
+
+ /** */
+ public static final int ComputeTaskComplete = 12;
+
+ /** */
+ public static final int ComputeJobSerialize = 13;
+
+ /** */
+ public static final int ComputeJobCreate = 14;
+
+ /** */
+ public static final int ComputeJobExecute = 15;
+
+ /** */
+ public static final int ComputeJobCancel = 16;
+
+ /** */
+ public static final int ComputeJobDestroy = 17;
+
+ /** */
+ public static final int ContinuousQueryListenerApply = 18;
+
+ /** */
+ public static final int ContinuousQueryFilterCreate = 19;
+
+ /** */
+ public static final int ContinuousQueryFilterApply = 20;
+
+ /** */
+ public static final int ContinuousQueryFilterRelease = 21;
+
+ /** */
+ public static final int DataStreamerTopologyUpdate = 22;
+
+ /** */
+ public static final int DataStreamerStreamReceiverInvoke = 23;
+
+ /** */
+ public static final int FutureByteResult = 24;
+
+ /** */
+ public static final int FutureBoolResult = 25;
+
+ /** */
+ public static final int FutureShortResult = 26;
+
+ /** */
+ public static final int FutureCharResult = 27;
+
+ /** */
+ public static final int FutureIntResult = 28;
+
+ /** */
+ public static final int FutureFloatResult = 29;
+
+ /** */
+ public static final int FutureLongResult = 30;
+
+ /** */
+ public static final int FutureDoubleResult = 31;
+
+ /** */
+ public static final int FutureObjectResult = 32;
+
+ /** */
+ public static final int FutureNullResult = 33;
+
+ /** */
+ public static final int FutureError = 34;
+
+ /** */
+ public static final int LifecycleOnEvent = 35;
+
+ /** */
+ public static final int MemoryReallocate = 36;
+
+ /** */
+ public static final int MessagingFilterCreate = 37;
+
+ /** */
+ public static final int MessagingFilterApply = 38;
+
+ /** */
+ public static final int MessagingFilterDestroy = 39;
+
+ /** */
+ public static final int EventFilterCreate = 40;
+
+ /** */
+ public static final int EventFilterApply = 41;
+
+ /** */
+ public static final int EventFilterDestroy = 42;
+
+ /** */
+ public static final int ServiceInit = 43;
+
+ /** */
+ public static final int ServiceExecute = 44;
+
+ /** */
+ public static final int ServiceCancel = 45;
+
+ /** */
+ public static final int ServiceInvokeMethod = 46;
+
+ /** */
+ public static final int ClusterNodeFilterApply = 47;
+
+ /** */
+ public static final int NodeInfo = 48;
+
+ /** */
+ public static final int OnStart = 49;
+
+ /** */
+ public static final int OnStop = 50;
+
+ /** */
+ public static final int ExtensionInLongOutLong = 51;
+
+ /** */
+ public static final int ExtensionInLongLongOutLong = 52;
+
+ /** */
+ public static final int OnClientDisconnected = 53;
+
+ /** */
+ public static final int OnClientReconnected = 54;
+
+ /** */
+ public static final int AffinityFunctionInit = 55;
+
+ /** */
+ public static final int AffinityFunctionPartition = 56;
+
+ /** */
+ public static final int AffinityFunctionAssignPartitions = 57;
+
+ /** */
+ public static final int AffinityFunctionRemoveNode = 58;
+
+ /** */
+ public static final int AffinityFunctionDestroy = 59;
+
+ /** */
+ public static final int ComputeTaskLocalJobResult = 60;
+
+ /** */
+ public static final int ComputeJobExecuteLocal = 61;
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 9d60ec0..f823cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -17,533 +17,12 @@
package org.apache.ignite.internal.processors.platform.callback;
-import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
-
/**
* Platform callback utility methods. Implemented in target platform. All methods in this class must be
* package-visible and invoked only through {@link PlatformCallbackGateway}.
*/
public class PlatformCallbackUtils {
/**
- * Create cache store.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer.
- */
- static native long cacheStoreCreate(long envPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Object pointer.
- * @param memPtr Memory pointer.
- * @return Result.
- */
- static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Object pointer.
- */
- static native void cacheStoreDestroy(long envPtr, long objPtr);
-
- /**
- * Creates cache store session.
- *
- * @param envPtr Environment pointer.
- * @param storePtr Store instance pointer.
- * @return Session instance pointer.
- */
- static native long cacheStoreSessionCreate(long envPtr, long storePtr);
-
- /**
- * Creates cache entry filter and returns a pointer.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer.
- */
- static native long cacheEntryFilterCreate(long envPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- * @param memPtr Memory pointer.
- * @return Result.
- */
- static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- */
- static native void cacheEntryFilterDestroy(long envPtr, long objPtr);
-
- /**
- * Invoke cache entry processor.
- *
- * @param envPtr Environment pointer.
- * @param outMemPtr Output memory pointer.
- * @param inMemPtr Input memory pointer.
- */
- static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr);
-
- /**
- * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
- *
- * @param envPtr Environment pointer.
- * @param taskPtr Task pointer.
- * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
- * @param inMemPtr Input memory pointer.
- */
- static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr);
-
- /**
- * Perform native task job result notification.
- *
- * @param envPtr Environment pointer.
- * @param taskPtr Task pointer.
- * @param jobPtr Job pointer.
- * @param memPtr Memory pointer (always zero for local job execution).
- * @return Job result enum ordinal.
- */
- static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr);
-
- /**
- * Perform native task reduce.
- *
- * @param envPtr Environment pointer.
- * @param taskPtr Task pointer.
- */
- static native void computeTaskReduce(long envPtr, long taskPtr);
-
- /**
- * Complete task with native error.
- *
- * @param envPtr Environment pointer.
- * @param taskPtr Task pointer.
- * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
- */
- static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr);
-
- /**
- * Serialize native job.
- *
- * @param envPtr Environment pointer.
- * @param jobPtr Job pointer.
- * @param memPtr Memory pointer.
- * @return {@code True} if serialization succeeded.
- */
- static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr);
-
- /**
- * Create job in native platform.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer to job.
- */
- static native long computeJobCreate(long envPtr, long memPtr);
-
- /**
- * Execute native job on a node other than where it was created.
- *
- * @param envPtr Environment pointer.
- * @param jobPtr Job pointer.
- * @param cancel Cancel flag.
- * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
- */
- static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr);
-
- /**
- * Cancel the job.
- *
- * @param envPtr Environment pointer.
- * @param jobPtr Job pointer.
- */
- static native void computeJobCancel(long envPtr, long jobPtr);
-
- /**
- * Destroy the job.
- *
- * @param envPtr Environment pointer.
- * @param ptr Pointer.
- */
- static native void computeJobDestroy(long envPtr, long ptr);
-
- /**
- * Invoke local callback.
- *
- * @param envPtr Environment pointer.
- * @param cbPtr Callback pointer.
- * @param memPtr Memory pointer.
- */
- static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr);
-
- /**
- * Create filter in native platform.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer to created filter.
- */
- static native long continuousQueryFilterCreate(long envPtr, long memPtr);
-
- /**
- * Invoke remote filter.
- *
- * @param envPtr Environment pointer.
- * @param filterPtr Filter pointer.
- * @param memPtr Memory pointer.
- * @return Result.
- */
- static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr);
-
- /**
- * Release remote filter.
- *
- * @param envPtr Environment pointer.
- * @param filterPtr Filter pointer.
- */
- static native void continuousQueryFilterRelease(long envPtr, long filterPtr);
-
- /**
- * Notify native data streamer about topology update.
- *
- * @param envPtr Environment pointer.
- * @param ptr Data streamer native pointer.
- * @param topVer Topology version.
- * @param topSize Topology size.
- */
- static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize);
-
- /**
- * Invoke stream receiver.
- *
- * @param envPtr Environment pointer.
- * @param ptr Receiver native pointer.
- * @param cache Cache object.
- * @param memPtr Stream pointer.
- * @param keepBinary Binary flag.
- */
- static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr,
- boolean keepBinary);
-
- /**
- * Notify future with byte result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureByteResult(long envPtr, long futPtr, int res);
-
- /**
- * Notify future with boolean result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureBoolResult(long envPtr, long futPtr, int res);
-
- /**
- * Notify future with short result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureShortResult(long envPtr, long futPtr, int res);
-
- /**
- * Notify future with byte result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureCharResult(long envPtr, long futPtr, int res);
-
- /**
- * Notify future with int result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureIntResult(long envPtr, long futPtr, int res);
-
- /**
- * Notify future with float result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureFloatResult(long envPtr, long futPtr, float res);
-
- /**
- * Notify future with long result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureLongResult(long envPtr, long futPtr, long res);
-
- /**
- * Notify future with double result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param res Result.
- */
- static native void futureDoubleResult(long envPtr, long futPtr, double res);
-
- /**
- * Notify future with object result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param memPtr Memory pointer.
- */
- static native void futureObjectResult(long envPtr, long futPtr, long memPtr);
-
- /**
- * Notify future with null result.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- */
- static native void futureNullResult(long envPtr, long futPtr);
-
- /**
- * Notify future with error.
- *
- * @param envPtr Environment pointer.
- * @param futPtr Future pointer.
- * @param memPtr Pointer to memory with error information.
- */
- static native void futureError(long envPtr, long futPtr, long memPtr);
-
- /**
- * Creates message filter and returns a pointer.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer.
- */
- static native long messagingFilterCreate(long envPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- * @param memPtr Memory pointer.
- * @return Result.
- */
- static native int messagingFilterApply(long envPtr, long objPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- */
- static native void messagingFilterDestroy(long envPtr, long objPtr);
-
- /**
- * Creates event filter and returns a pointer.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Memory pointer.
- * @return Pointer.
- */
- static native long eventFilterCreate(long envPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- * @param memPtr Memory pointer.
- * @return Result.
- */
- static native int eventFilterApply(long envPtr, long objPtr, long memPtr);
-
- /**
- * @param envPtr Environment pointer.
- * @param objPtr Pointer.
- */
- static native void eventFilterDestroy(long envPtr, long objPtr);
-
- /**
- * Sends node info to native target.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Ptr to a stream with serialized node.
- */
- static native void nodeInfo(long envPtr, long memPtr);
-
- /**
- * Kernal start callback.
- *
- * @param envPtr Environment pointer.
- * @param proc Platform processor.
- * @param memPtr Memory pointer.
- */
- static native void onStart(long envPtr, Object proc, long memPtr);
-
- /*
- * Kernal stop callback.
- *
- * @param envPtr Environment pointer.
- */
- static native void onStop(long envPtr);
-
- /**
- * Lifecycle event callback.
- *
- * @param envPtr Environment pointer.
- * @param ptr Holder pointer.
- * @param evt Event.
- */
- static native void lifecycleEvent(long envPtr, long ptr, int evt);
-
- /**
- * Re-allocate external memory chunk.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Cross-platform pointer.
- * @param cap Capacity.
- */
- static native void memoryReallocate(long envPtr, long memPtr, int cap);
-
- /**
- * Initializes native service.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Stream pointer.
- * @return Pointer to the native platform service.
- */
- static native long serviceInit(long envPtr, long memPtr);
-
- /**
- * Executes native service.
- *
- * @param envPtr Environment pointer.
- * @param svcPtr Pointer to the service in the native platform.
- * @param memPtr Stream pointer.
- */
- static native void serviceExecute(long envPtr, long svcPtr, long memPtr);
-
- /**
- * Cancels native service.
- *
- * @param envPtr Environment pointer.
- * @param svcPtr Pointer to the service in the native platform.
- * @param memPtr Stream pointer.
- */
- static native void serviceCancel(long envPtr, long svcPtr, long memPtr);
-
- /**
- * Invokes service method.
- *
- * @param envPtr Environment pointer.
- * @param svcPtr Pointer to the service in the native platform.
- * @param outMemPtr Output memory pointer.
- * @param inMemPtr Input memory pointer.
- */
- static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr);
-
- /**
- * Invokes cluster node filter.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Stream pointer.
- */
- static native int clusterNodeFilterApply(long envPtr, long memPtr);
-
- /**
- * Extension callback accepting single long argument and returning long result.
- *
- * @param envPtr Environment pointer.
- * @param typ Operation type.
- * @param arg1 Argument 1.
- * @return Long result.
- */
- static native long extensionCallbackInLongOutLong(long envPtr, int typ, long arg1);
-
- /**
- * Extension callback accepting two long arguments and returning long result.
- *
- * @param envPtr Environment pointer.
- * @param typ Operation type.
- * @param arg1 Argument 1.
- * @param arg2 Argument 2.
- * @return Long result.
- */
- static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2);
-
- /**
- * Notifies platform about client disconnect.
- *
- * @param envPtr Environment pointer.
- */
- static native void onClientDisconnected(long envPtr);
-
- /**
- * Notifies platform about client reconnect.
- *
- * @param envPtr Environment pointer.
- * @param clusterRestarted Cluster restarted flag.
- */
- static native void onClientReconnected(long envPtr, boolean clusterRestarted);
-
- /**
- * Initializes affinity function.
- *
- * @param envPtr Environment pointer.
- * @param memPtr Pointer to a stream with serialized affinity function.
- * @param baseFunc Optional func for base calls.
- * @return Affinity function pointer.
- */
- static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc);
-
- /**
- * Gets the partition from affinity function.
- *
- * @param envPtr Environment pointer.
- * @param ptr Affinity function pointer.
- * @param memPtr Pointer to a stream with key object.
- * @return Partition number for a given key.
- */
- static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr);
-
- /**
- * Assigns the affinity partitions.
- *
- * @param envPtr Environment pointer.
- * @param ptr Affinity function pointer.
- * @param outMemPtr Pointer to a stream with affinity context.
- * @param inMemPtr Pointer to a stream with result.
- */
- static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr);
-
- /**
- * Removes the node from affinity function.
- *
- * @param envPtr Environment pointer.
- * @param ptr Affinity function pointer.
- * @param memPtr Pointer to a stream with node id.
- */
- static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr);
-
- /**
- * Destroys the affinity function.
- *
- * @param envPtr Environment pointer.
- * @param ptr Affinity function pointer.
- */
- static native void affinityFunctionDestroy(long envPtr, long ptr);
-
- /**
* Redirects the console output.
*
* @param str String to write.
@@ -572,6 +51,29 @@ public class PlatformCallbackUtils {
static native boolean loggerIsLevelEnabled(long envPtr, int level);
/**
+ * Performs a generic long-long operation.
+ *
+ * @param envPtr Environment pointer.
+ * @param type Operation code.
+ * @param val Value.
+ * @return Value.
+ */
+ static native long inLongOutLong(long envPtr, int type, long val);
+
+ /**
+ * Performs a generic out-in operation.
+ *
+ * @param envPtr Environment pointer.
+ * @param type Operation code.
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @param val3 Third value.
+ * @param arg Object argument.
+ * @return Value.
+ */
+ static native long inLongLongLongObjectOutLong(long envPtr, int type, long val1, long val2, long val3, Object arg);
+
+ /**
* Private constructor.
*/
private PlatformCallbackUtils() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
index 32aed39..56875c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -131,7 +131,7 @@ public abstract class PlatformAbstractJob implements PlatformJob, Externalizable
// Local job, must execute it with respect to possible concurrent task completion.
if (task.onJobLock()) {
try {
- ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
+ ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0);
return LOC_JOB_RES;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
index fe1e316..6a9fed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -65,8 +65,6 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
- int plc;
-
lock.readLock().lock();
try {
@@ -78,9 +76,11 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
Object res0bj = res.getData();
+ int plc;
+
if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
// Processing local job execution result.
- plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
+ plc = ctx.gateway().computeTaskLocalJobResult(taskPtr, job.pointer());
else {
// Processing remote job execution result or exception.
try (PlatformMemory mem = ctx.memory().allocate()) {
@@ -88,6 +88,9 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(taskPtr);
+ writer.writeLong(job.pointer());
+
writer.writeUuid(res.getNode().id());
writer.writeBoolean(res.isCancelled());
@@ -97,7 +100,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
out.synchronize();
- plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
+ plc = ctx.gateway().computeTaskJobResult(mem.pointer());
}
}
@@ -184,7 +187,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
* @return {@code} True if task is not completed yet, {@code false} otherwise.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
- public boolean onJobLock() {
+ boolean onJobLock() {
lock.readLock().lock();
if (done) {
@@ -199,7 +202,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
/**
* Callback invoked by job when task can be unlocked.
*/
- public void onJobUnlock() {
+ void onJobUnlock() {
assert !done;
lock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
index f8567ce..25926eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.jetbrains.annotations.Nullable;
@@ -63,9 +64,16 @@ public class PlatformClosureJob extends PlatformAbstractJob {
createJob(ctx);
try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformInputStream in = mem.input();
+ PlatformOutputStream out = mem.output();
+
+ out.writeLong(ptr);
+ out.writeBoolean(false); // cancel
+
+ out.synchronize();
- ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
+ ctx.gateway().computeJobExecute(mem.pointer());
+
+ PlatformInputStream in = mem.input();
in.synchronize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
index 51c9cdb..9ff9609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.compute;
+import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.jetbrains.annotations.Nullable;
@@ -64,7 +66,7 @@ public class PlatformFullJob extends PlatformAbstractJob {
private transient byte state;
/**
- * {@link java.io.Externalizable} support.
+ * {@link Externalizable} support.
*/
@SuppressWarnings("UnusedDeclaration")
public PlatformFullJob() {
@@ -114,9 +116,16 @@ public class PlatformFullJob extends PlatformAbstractJob {
return runLocal(ctx, cancel);
else {
try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformInputStream in = mem.input();
+ PlatformOutputStream out = mem.output();
+
+ out.writeLong(ptr);
+ out.writeBoolean(cancel); // cancel
+
+ out.synchronize();
- ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
+ ctx.gateway().computeJobExecute(mem.pointer());
+
+ PlatformInputStream in = mem.input();
in.synchronize();
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index e2f6720..3134066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -80,26 +80,26 @@ public final class PlatformFullTask extends PlatformAbstractTask {
PlatformMemoryManager memMgr = ctx.memory();
- try (PlatformMemory outMem = memMgr.allocate()) {
- PlatformOutputStream out = outMem.output();
+ try (PlatformMemory mem = memMgr.allocate()) {
+ PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(taskPtr);
+
write(writer, nodes, subgrid);
out.synchronize();
- try (PlatformMemory inMem = memMgr.allocate()) {
- PlatformInputStream in = inMem.input();
+ ctx.gateway().computeTaskMap(mem.pointer());
- ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
+ PlatformInputStream in = mem.input();
- in.synchronize();
+ in.synchronize();
- BinaryRawReaderEx reader = ctx.reader(in);
+ BinaryRawReaderEx reader = ctx.reader(in);
- return read(reader, nodes);
- }
+ return read(reader, nodes);
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
index d0992fc..c3dde26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
@@ -78,6 +78,9 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
+ out.writeLong(ptr);
+ out.writeBoolean(keepBinary);
+
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeObject(pred);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index 3563dd6..5257b26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -396,7 +396,7 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
if (sesPtr == null) {
// Session is not deployed yet, do that.
- sesPtr = platformCtx.gateway().cacheStoreSessionCreate(ptr);
+ sesPtr = platformCtx.gateway().cacheStoreSessionCreate();
ses.properties().put(KEY_SES, sesPtr);
}
@@ -419,11 +419,13 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
BinaryRawWriterEx writer = platformCtx.writer(out);
+ writer.writeLong(ptr);
+
task.apply(writer);
out.synchronize();
- int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer());
+ int res = platformCtx.gateway().cacheStoreInvoke(mem.pointer());
if (res != 0) {
// Read error
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
index d6a6e16..4db01cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
@@ -110,13 +110,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
BinaryRawWriterEx writer = platformCtx.writer(out);
+ writer.writeLong(ptr);
+
writer.writeBoolean(srvKeepBinary);
writeServiceContext(ctx, writer);
out.synchronize();
- platformCtx.gateway().serviceExecute(ptr, mem.pointer());
+ platformCtx.gateway().serviceExecute(mem.pointer());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -133,13 +135,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
BinaryRawWriterEx writer = platformCtx.writer(out);
+ writer.writeLong(ptr);
+
writer.writeBoolean(srvKeepBinary);
writeServiceContext(ctx, writer);
out.synchronize();
- platformCtx.gateway().serviceCancel(ptr, mem.pointer());
+ platformCtx.gateway().serviceCancel(mem.pointer());
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -173,10 +177,11 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
assert ptr != 0;
assert platformCtx != null;
- try (PlatformMemory outMem = platformCtx.memory().allocate()) {
- PlatformOutputStream out = outMem.output();
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = platformCtx.writer(out);
+ writer.writeLong(ptr);
writer.writeBoolean(srvKeepBinary);
writer.writeString(mthdName);
@@ -192,17 +197,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
out.synchronize();
- try (PlatformMemory inMem = platformCtx.memory().allocate()) {
- PlatformInputStream in = inMem.input();
+ platformCtx.gateway().serviceInvokeMethod(mem.pointer());
- platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer());
+ PlatformInputStream in = mem.input();
- in.synchronize();
+ in.synchronize();
- BinaryRawReaderEx reader = platformCtx.reader(in);
+ BinaryRawReaderEx reader = platformCtx.reader(in);
- return PlatformUtils.readInvocationResult(platformCtx, reader);
- }
+ return PlatformUtils.readInvocationResult(platformCtx, reader);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index e81f4c6..b84744c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -250,7 +250,7 @@ public class PlatformFutureUtils {
break;
case TYP_FLOAT:
- gate.futureFloatResult(futPtr, (float) res);
+ gate.futureFloatResult(futPtr, Float.floatToIntBits((float) res));
break;
@@ -260,7 +260,7 @@ public class PlatformFutureUtils {
break;
case TYP_DOUBLE:
- gate.futureDoubleResult(futPtr, (double) res);
+ gate.futureDoubleResult(futPtr, Double.doubleToLongBits((double)res));
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 0d30ad9..4c0eab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -501,6 +501,8 @@ public class PlatformUtils {
BinaryRawWriterEx writer = ctx.writer(out);
+ writer.writeLong(lsnrPtr);
+
int cntPos = writer.reserveInt();
int cnt = 0;
@@ -515,7 +517,7 @@ public class PlatformUtils {
out.synchronize();
- ctx.gateway().continuousQueryListenerApply(lsnrPtr, mem.pointer());
+ ctx.gateway().continuousQueryListenerApply(mem.pointer());
}
catch (Exception e) {
throw toCacheEntryListenerException(e);
@@ -538,11 +540,13 @@ public class PlatformUtils {
try (PlatformMemory mem = ctx.memory().allocate()) {
PlatformOutputStream out = mem.output();
+ out.writeLong(filterPtr);
+
writeCacheEntryEvent(ctx.writer(out), evt);
out.synchronize();
- return ctx.gateway().continuousQueryFilterApply(filterPtr, mem.pointer()) == 1;
+ return ctx.gateway().continuousQueryFilterApply(mem.pointer()) == 1;
}
catch (Exception e) {
throw toCacheEntryListenerException(e);