You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/12/08 11:53:29 UTC
[1/2] ignite git commit: IGNITE-4027 Extract PlatformTargetProxy
interface
Repository: ignite
Updated Branches:
refs/heads/master 597f3a581 -> 59e6fec0b
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 8ff15d5..fd1c2d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -98,7 +99,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_UNICAST:
@@ -120,7 +121,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
case OP_EXEC_ASYNC:
- return executeJavaTask(reader, true);
+ return wrapListenable((PlatformListenable) executeJavaTask(reader, true));
default:
return super.processInStreamOutObject(type, reader);
@@ -128,7 +129,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_WITH_TIMEOUT: {
compute.withTimeout(val);
@@ -154,7 +155,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
* @param reader Reader.
* @param broadcast broadcast flag.
*/
- private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast,
+ private PlatformTarget processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast,
boolean affinity) {
PlatformAbstractTask task;
@@ -221,7 +222,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_EXEC:
@@ -239,7 +240,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
*
* @param task Task.
*/
- private PlatformListenable executeNative0(final PlatformAbstractTask task) {
+ private PlatformTarget executeNative0(final PlatformAbstractTask task) {
IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null);
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@@ -257,7 +258,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
});
- return PlatformFutureUtils.getListenable(fut);
+ return wrapListenable(PlatformFutureUtils.getListenable(fut));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index cd5fba0..7d71a9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -114,7 +114,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_UPDATE:
int plc = reader.readInt();
@@ -169,7 +169,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
switch (type) {
case OP_SET_ALLOW_OVERWRITE:
ldr.allowOverwrite(val == TRUE);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
index add11ed..d0992fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
@@ -23,6 +23,8 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
@@ -89,8 +91,10 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem
out.synchronize();
- ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepBinary),
- mem.pointer(), keepBinary);
+ PlatformCache cache0 = new PlatformCache(ctx, cache, keepBinary);
+ PlatformTargetProxy cacheProxy = new PlatformTargetProxyImpl(cache0, ctx);
+
+ ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, cacheProxy, mem.pointer(), keepBinary);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
index 811e38b..b57b140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
@@ -80,7 +80,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_COMPARE_AND_SET:
long cmp = reader.readLong();
@@ -99,7 +99,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicLong.addAndGet(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
index 63b5b86..a644259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
@@ -94,7 +94,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_GET)
writer.writeObject(atomicRef.get());
else
@@ -102,7 +102,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
if (type == OP_SET) {
atomicRef.set(reader.readObjectDetached());
@@ -114,7 +114,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader,
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_COMPARE_AND_SET_AND_GET) {
Object val = reader.readObjectDetached();
@@ -134,7 +134,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_CLOSE:
atomicRef.close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
index c352731..6d17a72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
@@ -71,7 +71,7 @@ public class PlatformAtomicSequence extends PlatformAbstractTarget {
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicSeq.addAndGet(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 383e7ab..9ddcc37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
@@ -117,7 +118,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_RECORD_LOCAL:
@@ -168,7 +169,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_LOCAL_QUERY: {
@@ -271,7 +272,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_ENABLED_EVENTS:
writeEventTypes(events.enabledEvents(), writer);
@@ -284,7 +285,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (events.isAsync())
@@ -297,7 +298,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_IS_ENABLED:
return events.isEnabled((int)val) ? TRUE : FALSE;
@@ -310,12 +311,12 @@ public class PlatformEvents extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
}
/** {@inheritDoc} */
- @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
switch (opId) {
case OP_WAIT_FOR_LOCAL:
return eventResWriter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 216427a..6fe109e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -86,7 +87,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_SEND:
@@ -149,7 +150,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_REMOTE_LISTEN:{
@@ -181,12 +182,12 @@ public class PlatformMessaging extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (messaging.isAsync())
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 962a4c0..22a7fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -144,7 +145,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_DOTNET_DEPLOY: {
@@ -195,7 +196,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_DOTNET_SERVICES: {
@@ -223,8 +224,8 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInObjectStreamOutObjectStream(int type, Object arg, BinaryRawReaderEx reader,
- BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, PlatformTarget arg,
+ BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_INVOKE: {
assert arg != null;
@@ -260,7 +261,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_DESCRIPTORS: {
Collection<ServiceDescriptor> descs = services.serviceDescriptors();
@@ -299,7 +300,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC:
if (services.isAsync())
@@ -315,7 +316,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_CANCEL_ALL:
services.cancelAll();
@@ -327,7 +328,7 @@ public class PlatformServices extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_SERVICE_PROXY: {
String name = reader.readString();
@@ -343,14 +344,14 @@ public class PlatformServices extends PlatformAbstractTarget {
: new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky,
platformCtx.kernalContext());
- return new ServiceProxyHolder(proxy, d.serviceClass());
+ return new ServiceProxyHolder(proxy, d.serviceClass(), platformContext());
}
}
return super.processInStreamOutObject(type, reader);
}
/** {@inheritDoc} */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)servicesAsync.future()).internalFuture();
}
@@ -392,7 +393,7 @@ public class PlatformServices extends PlatformAbstractTarget {
* Proxy holder.
*/
@SuppressWarnings("unchecked")
- private static class ServiceProxyHolder {
+ private static class ServiceProxyHolder extends PlatformAbstractTarget {
/** */
private final Object proxy;
@@ -422,7 +423,9 @@ public class PlatformServices extends PlatformAbstractTarget {
* @param proxy Proxy object.
* @param clazz Proxy class.
*/
- private ServiceProxyHolder(Object proxy, Class clazz) {
+ private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) {
+ super(ctx);
+
assert proxy != null;
assert clazz != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 1b41712..3cee2b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -153,7 +153,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_COMMIT:
tx(val).commit();
@@ -184,7 +184,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
long txId = reader.readLong();
final Transaction asyncTx = (Transaction)tx(txId).withAsync();
@@ -220,7 +220,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_START: {
TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(reader.readInt());
@@ -245,7 +245,7 @@ public class PlatformTransactions extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_CACHE_CONFIG_PARAMETERS:
TransactionConfiguration txCfg = platformCtx.kernalContext().config().getTransactionConfiguration();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index 5985d22..e81f4c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.platform.utils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
@@ -71,7 +71,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- final int typ, PlatformAbstractTarget target) {
+ final int typ, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, null, target);
@@ -88,7 +88,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
- final int typ, PlatformAbstractTarget target) {
+ final int typ, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, null, target);
@@ -107,7 +107,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- final int typ, Writer writer, PlatformAbstractTarget target) {
+ final int typ, Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, writer, target);
@@ -126,7 +126,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr,
- final int typ, Writer writer, PlatformAbstractTarget target) {
+ final int typ, Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, typ, writer, target);
@@ -144,7 +144,7 @@ public class PlatformFutureUtils {
* @return Resulting listenable.
*/
public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr,
- Writer writer, PlatformAbstractTarget target) {
+ Writer writer, PlatformTarget target) {
PlatformListenable listenable = getListenable(fut);
listen(ctx, listenable, futPtr, TYP_OBJ, writer, target);
@@ -183,7 +183,7 @@ public class PlatformFutureUtils {
*/
@SuppressWarnings("unchecked")
public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final
- int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) {
+ int typ, @Nullable final Writer writer, final PlatformTarget target) {
final PlatformCallbackGateway gate = ctx.gateway();
listenable.listen(new IgniteBiInClosure<Object, Throwable>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
new file mode 100644
index 0000000..7d65913
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Wraps listenable in a platform target.
+ */
+public class PlatformListenableTarget extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_CANCEL = 1;
+
+ /** */
+ private static final int OP_IS_CANCELLED = 2;
+
+ /** Wrapped listenable */
+ private final PlatformListenable listenable;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ */
+ public PlatformListenableTarget(PlatformListenable listenable, PlatformContext platformCtx) {
+ super(platformCtx);
+
+ assert listenable != null;
+
+ this.listenable = listenable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ switch (type) {
+ case OP_CANCEL:
+ return listenable.cancel() ? TRUE : FALSE;
+
+ case OP_IS_CANCELLED:
+ return listenable.isCancelled() ? TRUE : FALSE;
+ }
+
+ return super.processInLongOutLong(type, val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index 3052435..a2e5cbb 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -80,9 +80,6 @@ extern "C" {
void IGNITE_CALL IgniteDestroyJvm(gcj::JniContext* ctx);
- bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj);
- bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj);
-
void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler);
void IGNITE_CALL IgniteRemoveConsoleHandler(gcj::ConsoleWriteHandler consoleHandler);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 07df001..97e4412 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -286,10 +286,6 @@ namespace ignite
jmethodID m_PlatformUtils_reallocate;
jmethodID m_PlatformUtils_errData;
- jclass c_PlatformListenable;
- jmethodID m_PlatformListenable_cancel;
- jmethodID m_PlatformListenable_isCancelled;
-
/**
* Constructor.
*/
@@ -465,9 +461,6 @@ namespace ignite
jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr);
- bool ListenableCancel(jobject obj);
- bool ListenableIsCancelled(jobject obj);
-
jobject Acquire(jobject obj);
void DestroyJvm();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index e58ac3b..fb56dca 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -36,8 +36,6 @@ IgniteDestroyJvm @86
IgniteTargetOutObject @91
IgniteProcessorExtensions @97
IgniteProcessorAtomicLong @98
-IgniteListenableCancel @110
-IgniteListenableIsCancelled @111
IgniteProcessorCreateCacheFromConfig @114
IgniteProcessorGetOrCreateCacheFromConfig @115
IgniteProcessorGetIgniteConfiguration @116
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index dde98fb..b842c03 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -214,14 +214,6 @@ extern "C" {
ctx->DestroyJvm();
}
- bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) {
- return ctx->ListenableCancel(static_cast<jobject>(obj));
- }
-
- bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) {
- return ctx->ListenableIsCancelled(static_cast<jobject>(obj));
- }
-
void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler) {
gcj::JniContext::SetConsoleHandler(consoleHandler);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 2d3cf72..9626fbb 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -194,33 +194,33 @@ namespace ignite
const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor";
JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false);
- JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
JniMethod M_PLATFORM_PROCESSOR_DESTROY_CACHE = JniMethod("destroyCache", "(Ljava/lang/String;)V", false);
- JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
- JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
+ JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false);
JniMethod M_PLATFORM_PROCESSOR_GET_CACHE_NAMES = JniMethod("getCacheNames", "(J)V", false);
JniMethod M_PLATFORM_PROCESSOR_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(I)Z", false);
JniMethod M_PLATFORM_PROCESSOR_LOGGER_LOG = JniMethod("loggerLog", "(ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", false);
- JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false);
+ JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false);
- const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTarget";
+ const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy";
JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT = JniMethod("inStreamOutObject", "(IJ)Ljava/lang/Object;", false);
@@ -260,7 +260,7 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_CONTINUOUS_QUERY_FILTER_RELEASE = JniMethod("continuousQueryFilterRelease", "(JJ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_TOPOLOGY_UPDATE = JniMethod("dataStreamerTopologyUpdate", "(JJJI)V", true);
- JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLjava/lang/Object;JZ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;JZ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BYTE_RES = JniMethod("futureByteResult", "(JJI)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BOOL_RES = JniMethod("futureBoolResult", "(JJI)V", true);
@@ -307,7 +307,7 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_LOG = JniMethod("loggerLog", "(JILjava/lang/String;Ljava/lang/String;Ljava/lang/String;J)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(JI)Z", true);
- JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)J", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true);
@@ -327,10 +327,6 @@ namespace ignite
JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true);
JniMethod M_PLATFORM_IGNITION_STOP_ALL = JniMethod("stopAll", "(Z)V", true);
- const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable";
- JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false);
- JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false);
-
/* STATIC STATE. */
gcc::CriticalSection JVM_LOCK;
gcc::CriticalSection CONSOLE_LOCK;
@@ -552,10 +548,6 @@ namespace ignite
m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA);
- c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE);
- m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL);
- m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED);
-
// Find utility classes which are not used from context, but are still required in other places.
CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION);
}
@@ -1447,28 +1439,6 @@ namespace ignite
return LocalToGlobal(env, res);
}
- bool JniContext::ListenableCancel(jobject obj)
- {
- JNIEnv* env = Attach();
-
- jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel);
-
- ExceptionCheck(env);
-
- return res != 0;;
- }
-
- bool JniContext::ListenableIsCancelled(jobject obj)
- {
- JNIEnv* env = Attach();
-
- jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled);
-
- ExceptionCheck(env);
-
- return res != 0;;
- }
-
jobject JniContext::Acquire(jobject obj)
{
if (obj) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index f945efe..6421b8c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -181,6 +181,7 @@
<Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" />
<Compile Include="Impl\Cache\ICacheLockInternal.cs" />
+ <Compile Include="Impl\Common\Listenable.cs" />
<Compile Include="Impl\Common\Platform.cs" />
<Compile Include="Impl\Binary\UserSerializerProxy.cs" />
<Compile Include="Impl\Cache\Affinity\AffinityFunctionSerializer.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
index 68bd9d4..50102a7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
@@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Impl.Common
using System.Threading.Tasks;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Grid future implementation.
@@ -40,7 +39,7 @@ namespace Apache.Ignite.Core.Impl.Common
private readonly TaskCompletionSource<T> _taskCompletionSource = new TaskCompletionSource<T>();
/** */
- private volatile IUnmanagedTarget _unmanagedTarget;
+ private volatile Listenable _listenable;
/// <summary>
/// Constructor.
@@ -84,7 +83,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// <param name="cancellationToken">The cancellation token.</param>
public Task<T> GetTask(CancellationToken cancellationToken)
{
- Debug.Assert(_unmanagedTarget != null);
+ Debug.Assert(_listenable != null);
// OnTokenCancel will fire even if cancellationToken is already cancelled.
cancellationToken.Register(OnTokenCancel);
@@ -169,11 +168,11 @@ namespace Apache.Ignite.Core.Impl.Common
/// <summary>
/// Sets unmanaged future target for cancellation.
/// </summary>
- internal void SetTarget(IUnmanagedTarget target)
+ internal void SetTarget(Listenable target)
{
Debug.Assert(target != null);
- _unmanagedTarget = target;
+ _listenable = target;
}
/// <summary>
@@ -181,8 +180,8 @@ namespace Apache.Ignite.Core.Impl.Common
/// </summary>
private void OnTokenCancel()
{
- if (_unmanagedTarget != null)
- UnmanagedUtils.ListenableCancel(_unmanagedTarget);
+ if (_listenable != null)
+ _listenable.Cancel();
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
new file mode 100644
index 0000000..6da98ab
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
@@ -0,0 +1,49 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+
+ /// <summary>
+ /// Platform listenable.
+ /// </summary>
+ internal class Listenable : PlatformTarget
+ {
+ /** */
+ private const int OpCancel = 1;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Listenable"/> class.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Cancels the listenable.
+ /// </summary>
+ public void Cancel()
+ {
+ DoOutInOp(OpCancel);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index bc7c7d9..d36caf3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -195,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Compute
var future = holder.Future;
- future.SetTarget(futTarget);
+ future.SetTarget(new Listenable(futTarget, Marshaller));
return future;
}
@@ -550,7 +550,7 @@ namespace Apache.Ignite.Core.Impl.Compute
writeAction(writer);
});
- holder.Future.SetTarget(futTarget);
+ holder.Future.SetTarget(new Listenable(futTarget, Marshaller));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f4a07f6..9cf2a6c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -869,7 +869,7 @@ namespace Apache.Ignite.Core.Impl
throw;
}
- fut.SetTarget(futTarget);
+ fut.SetTarget(new Listenable(futTarget, _marsh));
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index c746866..c4f3e19 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -167,14 +167,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteDestroyJvm")]
public static extern void DestroyJvm(void* ctx);
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")]
- [return: MarshalAs(UnmanagedType.U1)]
- public static extern bool ListenableCancel(void* ctx, void* target);
-
- [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")]
- [return: MarshalAs(UnmanagedType.U1)]
- public static extern bool ListenableIsCancelled(void* ctx, void* target);
-
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteSetConsoleHandler")]
public static extern void SetConsoleHandler(void* consoleHandler);
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index f36c35f..0a2a1f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -516,11 +516,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.DestroyJvm(ctx);
}
- internal static bool ListenableCancel(IUnmanagedTarget target)
- {
- return JNI.ListenableCancel(target.Context, target.Target);
- }
-
#endregion
}
}
[2/2] ignite git commit: IGNITE-4027 Extract PlatformTargetProxy
interface
Posted by pt...@apache.org.
IGNITE-4027 Extract PlatformTargetProxy interface
This closes #1188
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59e6fec0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59e6fec0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59e6fec0
Branch: refs/heads/master
Commit: 59e6fec0b92c353ee5e128b9343a59f4b99bd468
Parents: 597f3a5
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Dec 8 14:53:16 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Dec 8 14:53:16 2016 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 268 +++----------------
.../platform/PlatformAsyncTarget.java | 44 +++
.../platform/PlatformNoopProcessor.java | 41 +--
.../processors/platform/PlatformProcessor.java | 42 +--
.../platform/PlatformProcessorImpl.java | 87 +++---
.../processors/platform/PlatformTarget.java | 103 ++++---
.../platform/PlatformTargetProxy.java | 126 +++++++++
.../platform/PlatformTargetProxyImpl.java | 222 +++++++++++++++
.../binary/PlatformBinaryProcessor.java | 6 +-
.../platform/cache/PlatformCache.java | 15 +-
.../platform/cache/PlatformCacheIterator.java | 2 +-
.../cache/affinity/PlatformAffinity.java | 4 +-
.../affinity/PlatformAffinityFunction.java | 7 +-
.../PlatformAffinityFunctionTarget.java | 4 +-
.../query/PlatformAbstractQueryCursor.java | 4 +-
.../query/PlatformContinuousQueryProxy.java | 3 +-
.../callback/PlatformCallbackGateway.java | 6 +-
.../callback/PlatformCallbackUtils.java | 6 +-
.../platform/cluster/PlatformClusterGroup.java | 18 +-
.../platform/compute/PlatformCompute.java | 15 +-
.../datastreamer/PlatformDataStreamer.java | 4 +-
.../PlatformStreamReceiverImpl.java | 8 +-
.../datastructures/PlatformAtomicLong.java | 4 +-
.../datastructures/PlatformAtomicReference.java | 8 +-
.../datastructures/PlatformAtomicSequence.java | 2 +-
.../platform/events/PlatformEvents.java | 15 +-
.../platform/messaging/PlatformMessaging.java | 9 +-
.../platform/services/PlatformServices.java | 27 +-
.../transactions/PlatformTransactions.java | 8 +-
.../platform/utils/PlatformFutureUtils.java | 14 +-
.../utils/PlatformListenableTarget.java | 62 +++++
.../cpp/jni/include/ignite/jni/exports.h | 3 -
.../platforms/cpp/jni/include/ignite/jni/java.h | 7 -
modules/platforms/cpp/jni/project/vs/module.def | 2 -
modules/platforms/cpp/jni/src/exports.cpp | 8 -
modules/platforms/cpp/jni/src/java.cpp | 76 ++----
.../Apache.Ignite.Core.csproj | 1 +
.../Apache.Ignite.Core/Impl/Common/Future.cs | 13 +-
.../Impl/Common/Listenable.cs | 49 ++++
.../Impl/Compute/ComputeImpl.cs | 4 +-
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 2 +-
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 8 -
.../Impl/Unmanaged/UnmanagedUtils.cs | 5 -
43 files changed, 817 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 2df86ac..506470b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -24,16 +24,16 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
/**
* Abstract interop target.
*/
-public abstract class PlatformAbstractTarget implements PlatformTarget {
+public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget {
/** Constant: TRUE.*/
protected static final int TRUE = 1;
@@ -60,144 +60,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
log = platformCtx.kernalContext().log(PlatformAbstractTarget.class);
}
- /** {@inheritDoc} */
- @Override public long inLongOutLong(int type, long val) throws Exception {
- try {
- return processInLongOutLong(type, val);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public long inStreamOutLong(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(mem);
-
- return processInStreamOutLong(type, reader, mem);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
- BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
-
- return processInStreamOutObject(type, reader);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void outStream(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- PlatformOutputStream out = mem.output();
-
- BinaryRawWriterEx writer = platformCtx.writer(out);
-
- processOutStream(type, writer);
-
- out.synchronize();
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object outObject(int type) throws Exception {
- try {
- return processOutObject(type);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
- try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(inMem);
-
- try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
- PlatformOutputStream out = outMem.output();
-
- BinaryRawWriterEx writer = platformCtx.writer(out);
-
- processInStreamOutStream(type, reader, writer);
-
- out.synchronize();
- }
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
- throws Exception {
- PlatformMemory inMem = null;
- PlatformMemory outMem = null;
-
- try {
- BinaryRawReaderEx reader = null;
-
- if (inMemPtr != 0) {
- inMem = platformCtx.memory().get(inMemPtr);
-
- reader = platformCtx.reader(inMem);
- }
-
- PlatformOutputStream out = null;
- BinaryRawWriterEx writer = null;
-
- if (outMemPtr != 0) {
- outMem = platformCtx.memory().get(outMemPtr);
-
- out = outMem.output();
-
- writer = platformCtx.writer(out);
- }
-
- Object res = processInObjectStreamOutObjectStream(type, arg, reader, writer);
-
- if (out != null)
- out.synchronize();
-
- return res;
- }
- catch (Exception e) {
- throw convertException(e);
- }
- finally {
- try {
- if (inMem != null)
- inMem.close();
- }
- finally {
- if (outMem != null)
- outMem.close();
- }
- }
- }
-
- /**
- * Convert caught exception.
- *
- * @param e Exception to convert.
- * @return Converted exception.
- */
- public Exception convertException(Exception e) {
- return e;
- }
-
/**
* @return Context.
*/
@@ -206,128 +68,60 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
}
/** {@inheritDoc} */
- @Override public void listenFuture(final long futId, int typ) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this);
+ @Override public Exception convertException(Exception e) {
+ return e;
}
/** {@inheritDoc} */
- @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this);
- }
-
- /**
- * When overridden in a derived class, gets future for the current operation.
- *
- * @return current future.
- * @throws IgniteCheckedException If failed.
- */
- protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
throw new IgniteCheckedException("Future listening is not supported in " + getClass());
}
- /**
- * When overridden in a derived class, gets a custom future writer.
- *
- * @param opId Operation id.
- * @return A custom writer for given op id.
- */
- @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){
+ /** {@inheritDoc} */
+ @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){
return null;
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param val Value.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
return processInStreamOutLong(type, reader);
}
- /**
- * Process IN-OUT operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ /** {@inheritDoc} */
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
throwUnsupported(type);
}
- /**
- * Process IN operation with managed object as result.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN-OUT operation.
- *
- * @param type Type.
- * @param arg Argument.
- * @param reader Binary reader.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object arg, BinaryRawReaderEx reader,
- BinaryRawWriterEx writer) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg,
+ BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process OUT operation.
- *
- * @param type Type.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
throwUnsupported(type);
}
- /**
- * Process OUT operation.
- *
- * @param type Type.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processOutObject(int type) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
return throwUnsupported(type);
}
@@ -338,7 +132,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
* @return Dummy value which is never returned.
* @throws IgniteCheckedException Exception to be thrown.
*/
- protected <T> T throwUnsupported(int type) throws IgniteCheckedException {
+ private <T> T throwUnsupported(int type) throws IgniteCheckedException {
throw new IgniteCheckedException("Unsupported operation type: " + type);
}
@@ -411,4 +205,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
return TRUE;
}
+
+ /**
+ * Wraps a listenable to be returned to platform.
+ *
+ * @param listenable Listenable.
+ * @return Target.
+ */
+ protected PlatformTarget wrapListenable(PlatformListenable listenable) {
+ return new PlatformListenableTarget(listenable, platformCtx);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
new file mode 100644
index 0000000..a4d35c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Async target.
+ */
+public interface PlatformAsyncTarget {
+ /**
+ * Gets future for the current operation.
+ *
+ * @return current future.
+ * @throws IgniteCheckedException If failed.
+ */
+ IgniteInternalFuture currentFuture() throws IgniteCheckedException;
+
+ /**
+ * Gets a custom future writer.
+ *
+ * @param opId Operation id.
+ * @return A custom writer for given op id.
+ */
+ @Nullable PlatformFutureUtils.Writer futureWriter(int opId);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
index fd357ec..2911418 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
@@ -61,27 +61,27 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
return null;
}
@@ -91,47 +91,48 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary)
+ throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget transactions() {
+ @Override public PlatformTargetProxy transactions() {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget projection() throws IgniteCheckedException {
+ @Override public PlatformTargetProxy projection() throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget compute(PlatformTarget grp) {
+ @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget message(PlatformTarget grp) {
+ @Override public PlatformTargetProxy message(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget events(PlatformTarget grp) {
+ @Override public PlatformTargetProxy events(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget services(PlatformTarget grp) {
+ @Override public PlatformTargetProxy services(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget extensions() {
+ @Override public PlatformTargetProxy extensions() {
return null;
}
@@ -142,7 +143,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException {
return null;
}
@@ -157,22 +158,22 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) throws IgniteException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) throws IgniteException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
return null;
}
@@ -187,7 +188,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget binaryProcessor() {
+ @Override public PlatformTargetProxy binaryProcessor() {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
index f01175e..e0d94d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
@@ -26,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Platform processor.
*/
-@SuppressWarnings("UnusedDeclaration")
+@SuppressWarnings({"UnusedDeclaration", "UnnecessaryInterfaceModifier"})
public interface PlatformProcessor extends GridProcessor {
/**
* Gets owning Ignite instance.
@@ -68,7 +68,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException;
/**
* Create cache.
@@ -77,7 +77,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException;
/**
* Get or create cache.
@@ -86,7 +86,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException;
/**
* Create cache.
@@ -95,7 +95,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException;
+ public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException;
/**
* Get or create cache.
@@ -104,7 +104,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException;
+ public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException;
/**
* Destroy dynamically created cache.
@@ -121,7 +121,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Affinity.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException;
/**
* Get data streamer.
@@ -131,14 +131,14 @@ public interface PlatformProcessor extends GridProcessor {
* @return Data streamer.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException;
+ public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException;
/**
* Get transactions.
*
* @return Transactions.
*/
- public PlatformTarget transactions();
+ public PlatformTargetProxy transactions();
/**
* Get projection.
@@ -146,7 +146,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Projection.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget projection() throws IgniteCheckedException;
+ public PlatformTargetProxy projection() throws IgniteCheckedException;
/**
* Create interop compute.
@@ -154,7 +154,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Compute instance.
*/
- public PlatformTarget compute(PlatformTarget grp);
+ public PlatformTargetProxy compute(PlatformTargetProxy grp);
/**
* Create interop messaging.
@@ -162,7 +162,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Messaging instance.
*/
- public PlatformTarget message(PlatformTarget grp);
+ public PlatformTargetProxy message(PlatformTargetProxy grp);
/**
* Create interop events.
@@ -170,7 +170,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Events instance.
*/
- public PlatformTarget events(PlatformTarget grp);
+ public PlatformTargetProxy events(PlatformTargetProxy grp);
/**
* Create interop services.
@@ -178,14 +178,14 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Services instance.
*/
- public PlatformTarget services(PlatformTarget grp);
+ public PlatformTargetProxy services(PlatformTargetProxy grp);
/**
* Get platform extensions. Override this method to provide any additional targets and operations you need.
*
* @return Platform extensions.
*/
- public PlatformTarget extensions();
+ public PlatformTargetProxy extensions();
/**
* Register cache store.
@@ -203,7 +203,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicLong(String name, long initVal, boolean create);
+ public PlatformTargetProxy atomicLong(String name, long initVal, boolean create);
/**
* Get or create AtomicSequence.
@@ -212,7 +212,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicSequence(String name, long initVal, boolean create);
+ public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create);
/**
* Get or create AtomicReference.
@@ -221,7 +221,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicReference(String name, long memPtr, boolean create);
+ public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create);
/**
* Gets the configuration of the current Ignite instance.
@@ -244,7 +244,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param memPtr Pointer to a stream with near cache config. 0 for default config.
* @return Cache.
*/
- public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr);
+ public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr);
/**
* Gets existing near cache with the given name or creates a new one.
@@ -253,7 +253,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param memPtr Pointer to a stream with near cache config. 0 for default config.
* @return Cache.
*/
- public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr);
+ public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr);
/**
* Gets a value indicating whether Ignite logger has specified level enabled.
@@ -277,5 +277,5 @@ public interface PlatformProcessor extends GridProcessor {
*
* @return Binary processor.
*/
- public PlatformTarget binaryProcessor();
+ public PlatformTargetProxy binaryProcessor();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index f775987..8c81ebb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -220,7 +220,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name);
if (cache == null)
@@ -230,7 +230,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name);
assert cache != null;
@@ -239,7 +239,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name);
assert cache != null;
@@ -248,7 +248,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException {
BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr));
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
@@ -260,7 +260,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr));
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
@@ -278,60 +278,60 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException {
- return new PlatformAffinity(platformCtx, ctx, name);
+ @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException {
+ return proxy(new PlatformAffinity(platformCtx, ctx, name));
}
/** {@inheritDoc} */
- @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary)
+ @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary)
throws IgniteCheckedException {
IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName);
ldr.keepBinary(true);
- return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary);
+ return proxy(new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary));
}
/** {@inheritDoc} */
- @Override public PlatformTarget transactions() {
- return new PlatformTransactions(platformCtx);
+ @Override public PlatformTargetProxy transactions() {
+ return proxy(new PlatformTransactions(platformCtx));
}
/** {@inheritDoc} */
- @Override public PlatformTarget projection() throws IgniteCheckedException {
- return new PlatformClusterGroup(platformCtx, ctx.grid().cluster());
+ @Override public PlatformTargetProxy projection() throws IgniteCheckedException {
+ return proxy(new PlatformClusterGroup(platformCtx, ctx.grid().cluster()));
}
/** {@inheritDoc} */
- @Override public PlatformTarget compute(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM);
+ return proxy(new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM));
}
/** {@inheritDoc} */
- @Override public PlatformTarget message(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy message(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection()));
+ return proxy(new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())));
}
/** {@inheritDoc} */
- @Override public PlatformTarget events(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy events(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection()));
+ return proxy(new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())));
}
/** {@inheritDoc} */
- @Override public PlatformTarget services(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy services(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false);
+ return proxy(new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false));
}
/** {@inheritDoc} */
- @Override public PlatformTarget extensions() {
+ @Override public PlatformTargetProxy extensions() {
return null;
}
@@ -356,28 +356,32 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException {
GridCacheAtomicLongImpl atomicLong = (GridCacheAtomicLongImpl)ignite().atomicLong(name, initVal, create);
if (atomicLong == null)
return null;
- return new PlatformAtomicLong(platformCtx, atomicLong);
+ return proxy(new PlatformAtomicLong(platformCtx, atomicLong));
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create)
+ throws IgniteException {
IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create);
if (atomicSeq == null)
return null;
- return new PlatformAtomicSequence(platformCtx, atomicSeq);
+ return proxy(new PlatformAtomicSequence(platformCtx, atomicSeq));
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException {
- return PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create);
+ @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create)
+ throws IgniteException {
+ PlatformAtomicReference ref = PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create);
+
+ return ref != null ? proxy(ref) : null;
}
/** {@inheritDoc} */
@@ -427,7 +431,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) {
NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createNearCache(cacheName, cfg);
@@ -436,7 +440,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateNearCache(cacheName, cfg);
@@ -447,8 +451,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
/**
* Creates new platform cache.
*/
- private PlatformTarget createPlatformCache(IgniteCacheProxy cache) {
- return new PlatformCache(platformCtx, cache, false, cacheExts);
+ private PlatformTargetProxy createPlatformCache(IgniteCacheProxy cache) {
+ return proxy(new PlatformCache(platformCtx, cache, false, cacheExts));
}
/** {@inheritDoc} */
@@ -504,8 +508,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget binaryProcessor() {
- return new PlatformBinaryProcessor(platformCtx);
+ @Override public PlatformTargetProxy binaryProcessor() {
+ return proxy(new PlatformBinaryProcessor(platformCtx));
}
/**
@@ -580,6 +584,13 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/**
+ * Wraps target in a proxy.
+ */
+ private PlatformTargetProxy proxy(PlatformTarget target) {
+ return new PlatformTargetProxyImpl(target, platformCtx);
+ }
+
+ /**
* Store and manager pair.
*/
private static class StoreInfo {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 805fd5e..5d234dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.jetbrains.annotations.Nullable;
/**
@@ -27,94 +29,89 @@ import org.jetbrains.annotations.Nullable;
@SuppressWarnings("UnusedDeclaration")
public interface PlatformTarget {
/**
- * Operation accepting long value and returning long value.
+ * Process IN operation.
*
- * @param type Operation type.
+ * @param type Type.
* @param val Value.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public long inLongOutLong(int type, long val) throws Exception;
+ long processInLongOutLong(int type, long val) throws IgniteCheckedException;
/**
- * Operation accepting memory stream and returning long value.
+ * Process IN operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
+ * @param type Type.
+ * @param reader Binary reader.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public long inStreamOutLong(int type, long memPtr) throws Exception;
+ long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
/**
- * Operation accepting memory stream and returning object.
+ * Process IN operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
+ * @param type Type.
+ * @param reader Binary reader.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object inStreamOutObject(int type, long memPtr) throws Exception;
+ long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException;
/**
- * Operation accepting one memory stream and returning result to another memory stream.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param reader Binary reader.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception;
+ void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ throws IgniteCheckedException;
/**
- * Operation accepting an object and a memory stream and returning result to another memory stream and an object.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param arg Argument (optional).
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @return Result.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param reader Binary reader.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
- throws Exception;
+ PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
/**
- * Operation returning result to memory stream.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param arg Argument.
+ * @param reader Binary reader.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public void outStream(int type, long memPtr) throws Exception;
+ PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader,
+ BinaryRawWriterEx writer) throws IgniteCheckedException;
/**
- * Operation returning object result.
+ * Process OUT operation.
*
- * @param type Operation type.
- * @return Result.
- * @throws Exception If failed.
+ * @param type Type.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object outObject(int type) throws Exception;
+ void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException;
/**
- * Start listening for the future.
+ * Process OUT operation.
*
- * @param futId Future ID.
- * @param typ Result type.
- * @throws IgniteCheckedException In case of failure.
+ * @param type Type.
+ * @throws IgniteCheckedException In case of exception.
*/
- @SuppressWarnings("UnusedDeclaration")
- public void listenFuture(final long futId, int typ) throws Exception;
+ PlatformTarget processOutObject(int type) throws IgniteCheckedException;
/**
- * Start listening for the future for specific operation type.
+ * Convert caught exception.
*
- * @param futId Future ID.
- * @param typ Result type.
- * @param opId Operation ID required to pick correct result writer.
- * @throws IgniteCheckedException In case of failure.
+ * @param e Exception to convert.
+ * @return Converted exception.
*/
- @SuppressWarnings("UnusedDeclaration")
- public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
+ Exception convertException(Exception e);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
new file mode 100644
index 0000000..a4f2a56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public interface PlatformTargetProxy {
+ /**
+ * Operation accepting long value and returning long value.
+ *
+ * @param type Operation type.
+ * @param val Value.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ long inLongOutLong(int type, long val) throws Exception;
+
+ /**
+ * Operation accepting memory stream and returning long value.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ long inStreamOutLong(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation accepting memory stream and returning object.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ Object inStreamOutObject(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation accepting one memory stream and returning result to another memory stream.
+ *
+ * @param type Operation type.
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @throws Exception In case of failure.
+ */
+ void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception;
+
+ /**
+ * Operation accepting an object and a memory stream and returning result to another memory stream and an object.
+ *
+ * @param type Operation type.
+ * @param arg Argument (optional).
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @return Result.
+ * @throws Exception In case of failure.
+ */
+ Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
+ throws Exception;
+
+ /**
+ * Operation returning result to memory stream.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @throws Exception In case of failure.
+ */
+ void outStream(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation returning object result.
+ *
+ * @param type Operation type.
+ * @return Result.
+ * @throws Exception If failed.
+ */
+ Object outObject(int type) throws Exception;
+
+ /**
+ * Start listening for the future.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ void listenFuture(final long futId, int typ) throws Exception;
+
+ /**
+ * Start listening for the future for specific operation type.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @param opId Operation ID required to pick correct result writer.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
+
+ /**
+ * Returns the underlying target.
+ *
+ * @return Underlying target.
+ */
+ PlatformTarget unwrap();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
new file mode 100644
index 0000000..25a4de8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+
+/**
+ * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
+ */
+public class PlatformTargetProxyImpl implements PlatformTargetProxy {
+ /** Context. */
+ protected final PlatformContext platformCtx;
+
+ /** Underlying target. */
+ private final PlatformTarget target;
+
+ public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) {
+ assert platformCtx != null;
+ assert target != null;
+
+ this.platformCtx = platformCtx;
+ this.target = target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long inLongOutLong(int type, long val) throws Exception {
+ try {
+ return target.processInLongOutLong(type, val);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long inStreamOutLong(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+ return target.processInStreamOutLong(type, reader, mem);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
+ BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
+
+ return wrapProxy(target.processInStreamOutObject(type, reader));
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void outStream(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ PlatformOutputStream out = mem.output();
+
+ BinaryRawWriterEx writer = platformCtx.writer(out);
+
+ target.processOutStream(type, writer);
+
+ out.synchronize();
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object outObject(int type) throws Exception {
+ try {
+ return wrapProxy(target.processOutObject(type));
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
+ try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(inMem);
+
+ try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
+ PlatformOutputStream out = outMem.output();
+
+ BinaryRawWriterEx writer = platformCtx.writer(out);
+
+ target.processInStreamOutStream(type, reader, writer);
+
+ out.synchronize();
+ }
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
+ throws Exception {
+ PlatformMemory inMem = null;
+ PlatformMemory outMem = null;
+
+ try {
+ BinaryRawReaderEx reader = null;
+
+ if (inMemPtr != 0) {
+ inMem = platformCtx.memory().get(inMemPtr);
+
+ reader = platformCtx.reader(inMem);
+ }
+
+ PlatformOutputStream out = null;
+ BinaryRawWriterEx writer = null;
+
+ if (outMemPtr != 0) {
+ outMem = platformCtx.memory().get(outMemPtr);
+
+ out = outMem.output();
+
+ writer = platformCtx.writer(out);
+ }
+
+ PlatformTarget res = target.processInObjectStreamOutObjectStream(type, unwrapProxy(arg), reader, writer);
+
+ if (out != null)
+ out.synchronize();
+
+ return wrapProxy(res);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ finally {
+ try {
+ if (inMem != null)
+ inMem.close();
+ }
+ finally {
+ if (outMem != null)
+ outMem.close();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listenFuture(final long futId, int typ) throws Exception {
+ PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
+ PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformTarget unwrap() {
+ return target;
+ }
+
+ /**
+ * @return Future writer.
+ */
+ private PlatformFutureUtils.Writer futureWriter(int opId) {
+ return ((PlatformAsyncTarget)target).futureWriter(opId);
+ }
+
+ /**
+ * @return Current future.
+ */
+ private IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ return ((PlatformAsyncTarget)target).currentFuture();
+ }
+
+ /**
+ * Wraps an object in a proxy when possible.
+ *
+ * @param obj Object to wrap.
+ * @return Wrapped object.
+ */
+ private Object wrapProxy(PlatformTarget obj) {
+ return obj == null ? null : new PlatformTargetProxyImpl(obj, platformCtx);
+ }
+
+ /**
+ * Unwraps an object from a proxy when possible.
+ *
+ * @param obj Object to unwrap.
+ * @return Unwrapped object.
+ */
+ private PlatformTarget unwrapProxy(Object obj) {
+ return obj == null ? null : ((PlatformTargetProxyImpl)obj).target;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
index 1bb577e..3c00abc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
@@ -49,7 +49,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
if (type == OP_PUT_META) {
platformCtx.processMetadata(reader);
@@ -60,7 +60,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_GET_ALL_META)
platformCtx.writeAllMetadata(writer);
else
@@ -68,7 +68,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader,
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_META: {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index aec3703..aee317e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
@@ -400,7 +401,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem)
throws IgniteCheckedException {
try {
switch (type) {
@@ -824,7 +825,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_QRY_SQL:
@@ -903,7 +904,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_NAME:
writer.writeObject(cache.getName());
@@ -940,7 +941,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC: {
if (cache.isAsync())
@@ -983,7 +984,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_SIZE: {
CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes((int)val);
@@ -1121,12 +1122,12 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** <inheritDoc /> */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl) cacheAsync.future()).internalFuture();
}
/** <inheritDoc /> */
- @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
if (opId == OP_GET_ALL)
return WRITER_GET_ALL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
index 292caea..4c11cc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
@@ -47,7 +47,7 @@ public class PlatformCacheIterator extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_NEXT:
if (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
index 12df188..e24345c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
@@ -117,7 +117,7 @@ public class PlatformAffinity extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_PARTITION:
return aff.partition(reader.readObjectDetached());
@@ -168,7 +168,7 @@ public class PlatformAffinity extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_PRIMARY_PARTITIONS: {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
index 8076a19..2d3cada 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -279,7 +280,11 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
? new PlatformAffinityFunctionTarget(ctx, baseFunc)
: null;
- ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget);
+ PlatformTargetProxyImpl baseTargetProxy = baseTarget != null
+ ? new PlatformTargetProxyImpl(baseTarget, ctx)
+ : null;
+
+ ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTargetProxy);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
index 8a07b33..342e726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
@@ -71,7 +71,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
if (type == OP_PARTITION)
return baseFunc.partition(reader.readObjectDetached());
else if (type == OP_REMOVE_NODE) {
@@ -84,7 +84,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_ASSIGN_PARTITIONS) {
AffinityFunctionContext affCtx = currentAffCtx.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
index 6a259ca..f201425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -71,7 +71,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_BATCH: {
assert iter != null : "iterator() has not been called";
@@ -136,7 +136,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ITERATOR:
iter = cursor.iterator();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
index 04f17ff..27d784a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.cache.query;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
/**
* Proxy that implements PlatformTarget.
@@ -41,7 +42,7 @@ public class PlatformContinuousQueryProxy extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override public Object outObject(int type) throws Exception {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
return qry.getInitialQueryCursor();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index f21861e..c77f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
/**
@@ -429,7 +429,7 @@ public class PlatformCallbackGateway {
* @param memPtr Stream pointer.
* @param keepBinary Binary flag.
*/
- public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepBinary) {
+ public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) {
enter();
try {
@@ -995,7 +995,7 @@ public class PlatformCallbackGateway {
* @param baseFunc Optional func for base calls.
* @return Affinity function pointer.
*/
- public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) {
+ public long affinityFunctionInit(long memPtr, PlatformTargetProxy baseFunc) {
enter();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 50c4c28..9d60ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.platform.callback;
-import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
/**
* Platform callback utility methods. Implemented in target platform. All methods in this class must be
@@ -226,7 +226,7 @@ public class PlatformCallbackUtils {
* @param memPtr Stream pointer.
* @param keepBinary Binary flag.
*/
- static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr,
+ static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr,
boolean keepBinary);
/**
@@ -504,7 +504,7 @@ public class PlatformCallbackUtils {
* @param baseFunc Optional func for base calls.
* @return Affinity function pointer.
*/
- static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc);
+ static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc);
/**
* Gets the partition from affinity function.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
index dc73468..f49f477 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -120,7 +121,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_METRICS:
platformCtx.writeClusterMetrics(writer, prj.metrics());
@@ -134,7 +135,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"ConstantConditions", "deprecation"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_METRICS_FILTERED: {
@@ -217,7 +218,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_PING_NODE:
return pingNode(reader.readUuid()) ? TRUE : FALSE;
@@ -228,7 +229,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
switch (type) {
case OP_FOR_NODE_IDS: {
Collection<UUID> ids = PlatformUtils.readCollection(reader);
@@ -272,8 +274,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInObjectStreamOutObjectStream(
- int type, @Nullable Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(
+ int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_FOR_OTHERS: {
@@ -289,7 +291,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_FOR_REMOTES:
return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
@@ -314,7 +316,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_RESET_METRICS: {
assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.