You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 08:57:38 UTC
[09/14] ignite git commit: IGNITE-4027 Extract PlatformTargetProxy
interface
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 8ff15d5..fd1c2d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -98,7 +99,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_UNICAST:
@@ -120,7 +121,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
case OP_EXEC_ASYNC:
- return executeJavaTask(reader, true);
+ return wrapListenable((PlatformListenable) executeJavaTask(reader, true));
default:
return super.processInStreamOutObject(type, reader);
@@ -128,7 +129,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_WITH_TIMEOUT: {
compute.withTimeout(val);
@@ -154,7 +155,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
* @param reader Reader.
* @param broadcast broadcast flag.
*/
- private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast,
+ private PlatformTarget processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast,
boolean affinity) {
PlatformAbstractTask task;
@@ -221,7 +222,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_EXEC:
@@ -239,7 +240,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
*
* @param task Task.
*/
- private PlatformListenable executeNative0(final PlatformAbstractTask task) {
+ private PlatformTarget executeNative0(final PlatformAbstractTask task) {
IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@@ -257,7 +258,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
});
- return PlatformFutureUtils.getListenable(fut);
+ return wrapListenable(PlatformFutureUtils.getListenable(fut));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index cd5fba0..7d71a9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -114,7 +114,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_UPDATE:
int plc = reader.readInt();
@@ -169,7 +169,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
switch (type) {
case OP_SET_ALLOW_OVERWRITE:
ldr.allowOverwrite(val == TRUE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
index add11ed..d0992fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
@@ -23,6 +23,8 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
@@ -89,8 +91,10 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem
out.synchronize();
- ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepBinary),
- mem.pointer(), keepBinary);
+ PlatformCache cache0 = new PlatformCache(ctx, cache, keepBinary);
+ PlatformTargetProxy cacheProxy = new PlatformTargetProxyImpl(cache0, ctx);
+
+ ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, cacheProxy, mem.pointer(), keepBinary);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
index 811e38b..b57b140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
@@ -80,7 +80,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_COMPARE_AND_SET:
long cmp = reader.readLong();
@@ -99,7 +99,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicLong.addAndGet(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
index 63b5b86..a644259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
@@ -94,7 +94,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_GET)
writer.writeObject(atomicRef.get());
else
@@ -102,7 +102,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
if (type == OP_SET) {
atomicRef.set(reader.readObjectDetached());
@@ -114,7 +114,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader,
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_COMPARE_AND_SET_AND_GET) {
Object val = reader.readObjectDetached();
@@ -134,7 +134,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_CLOSE:
atomicRef.close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
index c352731..6d17a72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
@@ -71,7 +71,7 @@ public class PlatformAtomicSequence extends PlatformAbstractTarget {
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicSeq.addAndGet(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 383e7ab..9ddcc37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
@@ -117,7 +118,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_RECORD_LOCAL:
@@ -168,7 +169,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_LOCAL_QUERY: {
@@ -271,7 +272,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_ENABLED_EVENTS:
writeEventTypes(events.enabledEvents(), writer);
@@ -284,7 +285,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (events.isAsync())
@@ -297,7 +298,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_IS_ENABLED:
return events.isEnabled((int)val) ? TRUE : FALSE;
@@ -310,12 +311,12 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
}
/** {@inheritDoc} */
- @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
switch (opId) {
case OP_WAIT_FOR_LOCAL:
return eventResWriter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 216427a..6fe109e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -86,7 +87,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_SEND:
@@ -149,7 +150,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_REMOTE_LISTEN:{
@@ -181,12 +182,12 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (messaging.isAsync())
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 962a4c0..22a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -144,7 +145,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_DOTNET_DEPLOY: {
@@ -195,7 +196,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_DOTNET_SERVICES: {
@@ -223,8 +224,8 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInObjectStreamOutObjectStream(int type, Object arg, BinaryRawReaderEx reader,
- BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, PlatformTarget arg,
+ BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_INVOKE: {
assert arg != null;
@@ -260,7 +261,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_DESCRIPTORS: {
Collection<ServiceDescriptor> descs = services.serviceDescriptors();
@@ -299,7 +300,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (services.isAsync())
@@ -315,7 +316,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_CANCEL_ALL:
services.cancelAll();
@@ -327,7 +328,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_SERVICE_PROXY: {
String name = reader.readString();
@@ -343,14 +344,14 @@ public class PlatformServices extends PlatformAbstractTarget {
: new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky,
platformCtx.kernalContext());
- return new ServiceProxyHolder(proxy, d.serviceClass());
+ return new ServiceProxyHolder(proxy, d.serviceClass(), platformContext());
}
}
return super.processInStreamOutObject(type, reader);
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)servicesAsync.future()).internalFuture();
}
@@ -392,7 +393,7 @@ public class PlatformServices extends PlatformAbstractTarget {
* Proxy holder.
*/
@SuppressWarnings("unchecked")
- private static class ServiceProxyHolder {
+ private static class ServiceProxyHolder extends PlatformAbstractTarget {
/** */
private final Object proxy;
@@ -422,7 +423,9 @@ public class PlatformServices extends PlatformAbstractTarget {
* @param proxy Proxy object.
* @param clazz Proxy class.
*/
- private ServiceProxyHolder(Object proxy, Class clazz) {
+ private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) {
+ super(ctx);
+
assert proxy != null;
assert clazz != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 1b41712..3cee2b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -153,7 +153,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_COMMIT:
tx(val).commit();
@@ -184,7 +184,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
long txId = reader.readLong();
final Transaction asyncTx = (Transaction)tx(txId).withAsync();
@@ -220,7 +220,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_START: {
TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(reader.readInt());
@@ -245,7 +245,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_CACHE_CONFIG_PARAMETERS:
TransactionConfiguration txCfg = platformCtx.kernalContext().config().getTransactionConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 5985d22..e81f4c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.platform.utils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
@@ -71,7 +71,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- final int typ, PlatformAbstractTarget target) {
+ final int typ, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, null, target);
@@ -88,7 +88,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
- final int typ, PlatformAbstractTarget target) {
+ final int typ, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, null, target);
@@ -107,7 +107,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- final int typ, Writer writer, PlatformAbstractTarget target) {
+ final int typ, Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, writer, target);
@@ -126,7 +126,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
- final int typ, Writer writer, PlatformAbstractTarget target) {
+ final int typ, Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, writer, target);
@@ -144,7 +144,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- Writer writer, PlatformAbstractTarget target) {
+ Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, TYP_OBJ, writer, target);
@@ -183,7 +183,7 @@ public class PlatformFutureUtils {
*/
@SuppressWarnings("unchecked")
public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
- int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) {
+ int typ, @Nullable final Writer writer, final PlatformTarget target) {
final PlatformCallbackGateway gate = ctx.gateway();
listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
new file mode 100644
index 0000000..7d65913
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Wraps listenable in a platform target.
+ */
+public class PlatformListenableTarget extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_CANCEL = 1;
+
+ /** */
+ private static final int OP_IS_CANCELLED = 2;
+
+ /** Wrapped listenable */
+ private final PlatformListenable listenable;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public PlatformListenableTarget(PlatformListenable listenable, PlatformContext platformCtx) {
+ super(platformCtx);
+
+ assert listenable != null;
+
+ this.listenable = listenable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ switch (type) {
+ case OP_CANCEL:
+ return listenable.cancel() ? TRUE : FALSE;
+
+ case OP_IS_CANCELLED:
+ return listenable.isCancelled() ? TRUE : FALSE;
+ }
+
+ return super.processInLongOutLong(type, val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index 3052435..a2e5cbb 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -80,9 +80,6 @@ extern "C" {
void IGNITE_CALL IgniteDestroyJvm(gcj::JniContext* ctx);
- bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj);
- bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj);
-
void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler);
void IGNITE_CALL IgniteRemoveConsoleHandler(gcj::ConsoleWriteHandler consoleHandler);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 07df001..97e4412 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -286,10 +286,6 @@ namespace ignite
jmethodID m_PlatformUtils_reallocate;
jmethodID m_PlatformUtils_errData;
- jclass c_PlatformListenable;
- jmethodID m_PlatformListenable_cancel;
- jmethodID m_PlatformListenable_isCancelled;
-
/**
* Constructor.
*/
@@ -465,9 +461,6 @@ namespace ignite
jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr);
- bool ListenableCancel(jobject obj);
- bool ListenableIsCancelled(jobject obj);
-
jobject Acquire(jobject obj);
void DestroyJvm();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index e58ac3b..fb56dca 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -36,8 +36,6 @@ IgniteDestroyJvm @86
IgniteTargetOutObject @91
IgniteProcessorExtensions @97
IgniteProcessorAtomicLong @98
-IgniteListenableCancel @110
-IgniteListenableIsCancelled @111
IgniteProcessorCreateCacheFromConfig @114
IgniteProcessorGetOrCreateCacheFromConfig @115
IgniteProcessorGetIgniteConfiguration @116
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index dde98fb..b842c03 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -214,14 +214,6 @@ extern "C" {
ctx->DestroyJvm();
}
- bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) {
- return ctx->ListenableCancel(static_cast<jobject>(obj));
- }
-
- bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) {
- return ctx->ListenableIsCancelled(static_cast<jobject>(obj));
- }
-
void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler) {
gcj::JniContext::SetConsoleHandler(consoleHandler);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 2d3cf72..9626fbb 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -194,33 +194,33 @@ namespace ignite
const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor";
JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false);
- JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
JniMethod M_PLATFORM_PROCESSOR_DESTROY_CACHE = JniMethod("destroyCache", "(Ljava/lang/String;)V", false);
- JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false);
JniMethod M_PLATFORM_PROCESSOR_GET_CACHE_NAMES = JniMethod("getCacheNames", "(J)V", false);
JniMethod M_PLATFORM_PROCESSOR_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(I)Z", false);
JniMethod M_PLATFORM_PROCESSOR_LOGGER_LOG = JniMethod("loggerLog", "(ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", false);
- JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
- const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTarget";
+ const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy";
JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT = JniMethod("inStreamOutObject", "(IJ)Ljava/lang/Object;", false);
@@ -260,7 +260,7 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_CONTINUOUS_QUERY_FILTER_RELEASE = JniMethod("continuousQueryFilterRelease", "(JJ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_TOPOLOGY_UPDATE = JniMethod("dataStreamerTopologyUpdate", "(JJJI)V", true);
- JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLjava/lang/Object;JZ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;JZ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BYTE_RES = JniMethod("futureByteResult", "(JJI)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BOOL_RES = JniMethod("futureBoolResult", "(JJI)V", true);
@@ -307,7 +307,7 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_LOG = JniMethod("loggerLog", "(JILjava/lang/String;Ljava/lang/String;Ljava/lang/String;J)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(JI)Z", true);
- JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)J", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true);
@@ -327,10 +327,6 @@ namespace ignite
JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true);
JniMethod M_PLATFORM_IGNITION_STOP_ALL = JniMethod("stopAll", "(Z)V", true);
- const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable";
- JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false);
- JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false);
-
/* STATIC STATE. */
gcc::CriticalSection JVM_LOCK;
gcc::CriticalSection CONSOLE_LOCK;
@@ -552,10 +548,6 @@ namespace ignite
m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA);
- c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE);
- m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL);
- m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED);
-
// Find utility classes which are not used from context, but are still required in other places.
CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION);
}
@@ -1447,28 +1439,6 @@ namespace ignite
return LocalToGlobal(env, res);
}
- bool JniContext::ListenableCancel(jobject obj)
- {
- JNIEnv* env = Attach();
-
- jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel);
-
- ExceptionCheck(env);
-
- return res != 0;;
- }
-
- bool JniContext::ListenableIsCancelled(jobject obj)
- {
- JNIEnv* env = Attach();
-
- jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled);
-
- ExceptionCheck(env);
-
- return res != 0;;
- }
-
jobject JniContext::Acquire(jobject obj)
{
if (obj) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index f945efe..6421b8c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -181,6 +181,7 @@
<Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" />
<Compile Include="Impl\Cache\ICacheLockInternal.cs" />
+ <Compile Include="Impl\Common\Listenable.cs" />
<Compile Include="Impl\Common\Platform.cs" />
<Compile Include="Impl\Binary\UserSerializerProxy.cs" />
<Compile Include="Impl\Cache\Affinity\AffinityFunctionSerializer.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
index 68bd9d4..50102a7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
@@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Impl.Common
using System.Threading.Tasks;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Grid future implementation.
@@ -40,7 +39,7 @@ namespace Apache.Ignite.Core.Impl.Common
private readonly TaskCompletionSource<T> _taskCompletionSource = new TaskCompletionSource<T>();
/** */
- private volatile IUnmanagedTarget _unmanagedTarget;
+ private volatile Listenable _listenable;
/// <summary>
/// Constructor.
@@ -84,7 +83,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// <param name="cancellationToken">The cancellation token.</param>
public Task<T> GetTask(CancellationToken cancellationToken)
{
- Debug.Assert(_unmanagedTarget != null);
+ Debug.Assert(_listenable != null);
// OnTokenCancel will fire even if cancellationToken is already cancelled.
cancellationToken.Register(OnTokenCancel);
@@ -169,11 +168,11 @@ namespace Apache.Ignite.Core.Impl.Common
/// <summary>
/// Sets unmanaged future target for cancellation.
/// </summary>
- internal void SetTarget(IUnmanagedTarget target)
+ internal void SetTarget(Listenable target)
{
Debug.Assert(target != null);
- _unmanagedTarget = target;
+ _listenable = target;
}
/// <summary>
@@ -181,8 +180,8 @@ namespace Apache.Ignite.Core.Impl.Common
/// </summary>
private void OnTokenCancel()
{
- if (_unmanagedTarget != null)
- UnmanagedUtils.ListenableCancel(_unmanagedTarget);
+ if (_listenable != null)
+ _listenable.Cancel();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
new file mode 100644
index 0000000..6da98ab
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
@@ -0,0 +1,49 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+
+ /// <summary>
+ /// Platform listenable.
+ /// </summary>
+ internal class Listenable : PlatformTarget
+ {
+ /** */
+ private const int OpCancel = 1;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Listenable"/> class.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Cancels the listenable.
+ /// </summary>
+ public void Cancel()
+ {
+ DoOutInOp(OpCancel);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index bc7c7d9..d36caf3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -195,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Compute
var future = holder.Future;
- future.SetTarget(futTarget);
+ future.SetTarget(new Listenable(futTarget, Marshaller));
return future;
}
@@ -550,7 +550,7 @@ namespace Apache.Ignite.Core.Impl.Compute
writeAction(writer);
});
- holder.Future.SetTarget(futTarget);
+ holder.Future.SetTarget(new Listenable(futTarget, Marshaller));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f4a07f6..9cf2a6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -869,7 +869,7 @@ namespace Apache.Ignite.Core.Impl
throw;
}
- fut.SetTarget(futTarget);
+ fut.SetTarget(new Listenable(futTarget, _marsh));
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index c746866..c4f3e19 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -167,14 +167,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteDestroyJvm")]
public static extern void DestroyJvm(void* ctx);
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")]
- [return: MarshalAs(UnmanagedType.U1)]
- public static extern bool ListenableCancel(void* ctx, void* target);
-
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")]
- [return: MarshalAs(UnmanagedType.U1)]
- public static extern bool ListenableIsCancelled(void* ctx, void* target);
-
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteSetConsoleHandler")]
public static extern void SetConsoleHandler(void* consoleHandler);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index f36c35f..0a2a1f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -516,11 +516,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.DestroyJvm(ctx);
}
- internal static bool ListenableCancel(IUnmanagedTarget target)
- {
- return JNI.ListenableCancel(target.Context, target.Target);
- }
-
#endregion
}
}