You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/12/08 11:53:30 UTC
[2/2] ignite git commit: IGNITE-4027 Extract PlatformTargetProxy
interface
IGNITE-4027 Extract PlatformTargetProxy interface
This closes #1188
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59e6fec0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59e6fec0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59e6fec0
Branch: refs/heads/master
Commit: 59e6fec0b92c353ee5e128b9343a59f4b99bd468
Parents: 597f3a5
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Dec 8 14:53:16 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Dec 8 14:53:16 2016 +0300
----------------------------------------------------------------------
.../platform/PlatformAbstractTarget.java | 268 +++----------------
.../platform/PlatformAsyncTarget.java | 44 +++
.../platform/PlatformNoopProcessor.java | 41 +--
.../processors/platform/PlatformProcessor.java | 42 +--
.../platform/PlatformProcessorImpl.java | 87 +++---
.../processors/platform/PlatformTarget.java | 103 ++++---
.../platform/PlatformTargetProxy.java | 126 +++++++++
.../platform/PlatformTargetProxyImpl.java | 222 +++++++++++++++
.../binary/PlatformBinaryProcessor.java | 6 +-
.../platform/cache/PlatformCache.java | 15 +-
.../platform/cache/PlatformCacheIterator.java | 2 +-
.../cache/affinity/PlatformAffinity.java | 4 +-
.../affinity/PlatformAffinityFunction.java | 7 +-
.../PlatformAffinityFunctionTarget.java | 4 +-
.../query/PlatformAbstractQueryCursor.java | 4 +-
.../query/PlatformContinuousQueryProxy.java | 3 +-
.../callback/PlatformCallbackGateway.java | 6 +-
.../callback/PlatformCallbackUtils.java | 6 +-
.../platform/cluster/PlatformClusterGroup.java | 18 +-
.../platform/compute/PlatformCompute.java | 15 +-
.../datastreamer/PlatformDataStreamer.java | 4 +-
.../PlatformStreamReceiverImpl.java | 8 +-
.../datastructures/PlatformAtomicLong.java | 4 +-
.../datastructures/PlatformAtomicReference.java | 8 +-
.../datastructures/PlatformAtomicSequence.java | 2 +-
.../platform/events/PlatformEvents.java | 15 +-
.../platform/messaging/PlatformMessaging.java | 9 +-
.../platform/services/PlatformServices.java | 27 +-
.../transactions/PlatformTransactions.java | 8 +-
.../platform/utils/PlatformFutureUtils.java | 14 +-
.../utils/PlatformListenableTarget.java | 62 +++++
.../cpp/jni/include/ignite/jni/exports.h | 3 -
.../platforms/cpp/jni/include/ignite/jni/java.h | 7 -
modules/platforms/cpp/jni/project/vs/module.def | 2 -
modules/platforms/cpp/jni/src/exports.cpp | 8 -
modules/platforms/cpp/jni/src/java.cpp | 76 ++----
.../Apache.Ignite.Core.csproj | 1 +
.../Apache.Ignite.Core/Impl/Common/Future.cs | 13 +-
.../Impl/Common/Listenable.cs | 49 ++++
.../Impl/Compute/ComputeImpl.cs | 4 +-
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 2 +-
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 8 -
.../Impl/Unmanaged/UnmanagedUtils.cs | 5 -
43 files changed, 817 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 2df86ac..506470b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -24,16 +24,16 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
/**
* Abstract interop target.
*/
-public abstract class PlatformAbstractTarget implements PlatformTarget {
+public abstract class PlatformAbstractTarget implements PlatformTarget, PlatformAsyncTarget {
/** Constant: TRUE.*/
protected static final int TRUE = 1;
@@ -60,144 +60,6 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
log = platformCtx.kernalContext().log(PlatformAbstractTarget.class);
}
- /** {@inheritDoc} */
- @Override public long inLongOutLong(int type, long val) throws Exception {
- try {
- return processInLongOutLong(type, val);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public long inStreamOutLong(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(mem);
-
- return processInStreamOutLong(type, reader, mem);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
- BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
-
- return processInStreamOutObject(type, reader);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void outStream(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- PlatformOutputStream out = mem.output();
-
- BinaryRawWriterEx writer = platformCtx.writer(out);
-
- processOutStream(type, writer);
-
- out.synchronize();
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object outObject(int type) throws Exception {
- try {
- return processOutObject(type);
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
- try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(inMem);
-
- try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
- PlatformOutputStream out = outMem.output();
-
- BinaryRawWriterEx writer = platformCtx.writer(out);
-
- processInStreamOutStream(type, reader, writer);
-
- out.synchronize();
- }
- }
- catch (Exception e) {
- throw convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
- throws Exception {
- PlatformMemory inMem = null;
- PlatformMemory outMem = null;
-
- try {
- BinaryRawReaderEx reader = null;
-
- if (inMemPtr != 0) {
- inMem = platformCtx.memory().get(inMemPtr);
-
- reader = platformCtx.reader(inMem);
- }
-
- PlatformOutputStream out = null;
- BinaryRawWriterEx writer = null;
-
- if (outMemPtr != 0) {
- outMem = platformCtx.memory().get(outMemPtr);
-
- out = outMem.output();
-
- writer = platformCtx.writer(out);
- }
-
- Object res = processInObjectStreamOutObjectStream(type, arg, reader, writer);
-
- if (out != null)
- out.synchronize();
-
- return res;
- }
- catch (Exception e) {
- throw convertException(e);
- }
- finally {
- try {
- if (inMem != null)
- inMem.close();
- }
- finally {
- if (outMem != null)
- outMem.close();
- }
- }
- }
-
- /**
- * Convert caught exception.
- *
- * @param e Exception to convert.
- * @return Converted exception.
- */
- public Exception convertException(Exception e) {
- return e;
- }
-
/**
* @return Context.
*/
@@ -206,128 +68,60 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
}
/** {@inheritDoc} */
- @Override public void listenFuture(final long futId, int typ) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this);
+ @Override public Exception convertException(Exception e) {
+ return e;
}
/** {@inheritDoc} */
- @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
- PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this);
- }
-
- /**
- * When overridden in a derived class, gets future for the current operation.
- *
- * @return current future.
- * @throws IgniteCheckedException If failed.
- */
- protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
throw new IgniteCheckedException("Future listening is not supported in " + getClass());
}
- /**
- * When overridden in a derived class, gets a custom future writer.
- *
- * @param opId Operation id.
- * @return A custom writer for given op id.
- */
- @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){
+ /** {@inheritDoc} */
+ @Override @Nullable public PlatformFutureUtils.Writer futureWriter(int opId){
return null;
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param val Value.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException {
return processInStreamOutLong(type, reader);
}
- /**
- * Process IN-OUT operation.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ /** {@inheritDoc} */
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
throwUnsupported(type);
}
- /**
- * Process IN operation with managed object as result.
- *
- * @param type Type.
- * @param reader Binary reader.
- * @return Result.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process IN-OUT operation.
- *
- * @param type Type.
- * @param arg Argument.
- * @param reader Binary reader.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object arg, BinaryRawReaderEx reader,
- BinaryRawWriterEx writer) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg,
+ BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException {
return throwUnsupported(type);
}
- /**
- * Process OUT operation.
- *
- * @param type Type.
- * @param writer Binary writer.
- * @throws IgniteCheckedException In case of exception.
- */
- protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
throwUnsupported(type);
}
- /**
- * Process OUT operation.
- *
- * @param type Type.
- * @throws IgniteCheckedException In case of exception.
- */
- protected Object processOutObject(int type) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
return throwUnsupported(type);
}
@@ -338,7 +132,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
* @return Dummy value which is never returned.
* @throws IgniteCheckedException Exception to be thrown.
*/
- protected <T> T throwUnsupported(int type) throws IgniteCheckedException {
+ private <T> T throwUnsupported(int type) throws IgniteCheckedException {
throw new IgniteCheckedException("Unsupported operation type: " + type);
}
@@ -411,4 +205,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
return TRUE;
}
+
+ /**
+ * Wraps a listenable to be returned to platform.
+ *
+ * @param listenable Listenable.
+ * @return Target.
+ */
+ protected PlatformTarget wrapListenable(PlatformListenable listenable) {
+ return new PlatformListenableTarget(listenable, platformCtx);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
new file mode 100644
index 0000000..a4d35c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncTarget.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Async target.
+ */
+public interface PlatformAsyncTarget {
+ /**
+ * Gets future for the current operation.
+ *
+ * @return current future.
+ * @throws IgniteCheckedException If failed.
+ */
+ IgniteInternalFuture currentFuture() throws IgniteCheckedException;
+
+ /**
+ * Gets a custom future writer.
+ *
+ * @param opId Operation id.
+ * @return A custom writer for given op id.
+ */
+ @Nullable PlatformFutureUtils.Writer futureWriter(int opId);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
index fd357ec..2911418 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
@@ -61,27 +61,27 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
return null;
}
@@ -91,47 +91,48 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary)
+ throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget transactions() {
+ @Override public PlatformTargetProxy transactions() {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget projection() throws IgniteCheckedException {
+ @Override public PlatformTargetProxy projection() throws IgniteCheckedException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget compute(PlatformTarget grp) {
+ @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget message(PlatformTarget grp) {
+ @Override public PlatformTargetProxy message(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget events(PlatformTarget grp) {
+ @Override public PlatformTargetProxy events(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget services(PlatformTarget grp) {
+ @Override public PlatformTargetProxy services(PlatformTargetProxy grp) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget extensions() {
+ @Override public PlatformTargetProxy extensions() {
return null;
}
@@ -142,7 +143,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException {
return null;
}
@@ -157,22 +158,22 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create) throws IgniteException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create) throws IgniteException {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) {
return null;
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
return null;
}
@@ -187,7 +188,7 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget binaryProcessor() {
+ @Override public PlatformTargetProxy binaryProcessor() {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
index f01175e..e0d94d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
@@ -26,7 +26,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Platform processor.
*/
-@SuppressWarnings("UnusedDeclaration")
+@SuppressWarnings({"UnusedDeclaration", "UnnecessaryInterfaceModifier"})
public interface PlatformProcessor extends GridProcessor {
/**
* Gets owning Ignite instance.
@@ -68,7 +68,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException;
/**
* Create cache.
@@ -77,7 +77,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException;
/**
* Get or create cache.
@@ -86,7 +86,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException;
/**
* Create cache.
@@ -95,7 +95,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException;
+ public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException;
/**
* Get or create cache.
@@ -104,7 +104,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Cache.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException;
+ public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException;
/**
* Destroy dynamically created cache.
@@ -121,7 +121,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Affinity.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException;
+ public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException;
/**
* Get data streamer.
@@ -131,14 +131,14 @@ public interface PlatformProcessor extends GridProcessor {
* @return Data streamer.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException;
+ public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary) throws IgniteCheckedException;
/**
* Get transactions.
*
* @return Transactions.
*/
- public PlatformTarget transactions();
+ public PlatformTargetProxy transactions();
/**
* Get projection.
@@ -146,7 +146,7 @@ public interface PlatformProcessor extends GridProcessor {
* @return Projection.
* @throws IgniteCheckedException If failed.
*/
- public PlatformTarget projection() throws IgniteCheckedException;
+ public PlatformTargetProxy projection() throws IgniteCheckedException;
/**
* Create interop compute.
@@ -154,7 +154,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Compute instance.
*/
- public PlatformTarget compute(PlatformTarget grp);
+ public PlatformTargetProxy compute(PlatformTargetProxy grp);
/**
* Create interop messaging.
@@ -162,7 +162,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Messaging instance.
*/
- public PlatformTarget message(PlatformTarget grp);
+ public PlatformTargetProxy message(PlatformTargetProxy grp);
/**
* Create interop events.
@@ -170,7 +170,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Events instance.
*/
- public PlatformTarget events(PlatformTarget grp);
+ public PlatformTargetProxy events(PlatformTargetProxy grp);
/**
* Create interop services.
@@ -178,14 +178,14 @@ public interface PlatformProcessor extends GridProcessor {
* @param grp Cluster group.
* @return Services instance.
*/
- public PlatformTarget services(PlatformTarget grp);
+ public PlatformTargetProxy services(PlatformTargetProxy grp);
/**
* Get platform extensions. Override this method to provide any additional targets and operations you need.
*
* @return Platform extensions.
*/
- public PlatformTarget extensions();
+ public PlatformTargetProxy extensions();
/**
* Register cache store.
@@ -203,7 +203,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicLong(String name, long initVal, boolean create);
+ public PlatformTargetProxy atomicLong(String name, long initVal, boolean create);
/**
* Get or create AtomicSequence.
@@ -212,7 +212,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicSequence(String name, long initVal, boolean create);
+ public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create);
/**
* Get or create AtomicReference.
@@ -221,7 +221,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param create Create flag.
* @return Platform atomic long.
*/
- public PlatformTarget atomicReference(String name, long memPtr, boolean create);
+ public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create);
/**
* Gets the configuration of the current Ignite instance.
@@ -244,7 +244,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param memPtr Pointer to a stream with near cache config. 0 for default config.
* @return Cache.
*/
- public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr);
+ public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr);
/**
* Gets existing near cache with the given name or creates a new one.
@@ -253,7 +253,7 @@ public interface PlatformProcessor extends GridProcessor {
* @param memPtr Pointer to a stream with near cache config. 0 for default config.
* @return Cache.
*/
- public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr);
+ public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr);
/**
* Gets a value indicating whether Ignite logger has specified level enabled.
@@ -277,5 +277,5 @@ public interface PlatformProcessor extends GridProcessor {
*
* @return Binary processor.
*/
- public PlatformTarget binaryProcessor();
+ public PlatformTargetProxy binaryProcessor();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
index f775987..8c81ebb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java
@@ -220,7 +220,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy cache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name);
if (cache == null)
@@ -230,7 +230,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name);
assert cache != null;
@@ -239,7 +239,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCache(@Nullable String name) throws IgniteCheckedException {
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name);
assert cache != null;
@@ -248,7 +248,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy createCacheFromConfig(long memPtr) throws IgniteCheckedException {
BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr));
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
@@ -260,7 +260,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
+ @Override public PlatformTargetProxy getOrCreateCacheFromConfig(long memPtr) throws IgniteCheckedException {
BinaryRawReaderEx reader = platformCtx.reader(platformCtx.memory().get(memPtr));
CacheConfiguration cfg = PlatformConfigurationUtils.readCacheConfiguration(reader);
@@ -278,60 +278,60 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException {
- return new PlatformAffinity(platformCtx, ctx, name);
+ @Override public PlatformTargetProxy affinity(@Nullable String name) throws IgniteCheckedException {
+ return proxy(new PlatformAffinity(platformCtx, ctx, name));
}
/** {@inheritDoc} */
- @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepBinary)
+ @Override public PlatformTargetProxy dataStreamer(@Nullable String cacheName, boolean keepBinary)
throws IgniteCheckedException {
IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName);
ldr.keepBinary(true);
- return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary);
+ return proxy(new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepBinary));
}
/** {@inheritDoc} */
- @Override public PlatformTarget transactions() {
- return new PlatformTransactions(platformCtx);
+ @Override public PlatformTargetProxy transactions() {
+ return proxy(new PlatformTransactions(platformCtx));
}
/** {@inheritDoc} */
- @Override public PlatformTarget projection() throws IgniteCheckedException {
- return new PlatformClusterGroup(platformCtx, ctx.grid().cluster());
+ @Override public PlatformTargetProxy projection() throws IgniteCheckedException {
+ return proxy(new PlatformClusterGroup(platformCtx, ctx.grid().cluster()));
}
/** {@inheritDoc} */
- @Override public PlatformTarget compute(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy compute(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM);
+ return proxy(new PlatformCompute(platformCtx, grp0.projection(), PlatformUtils.ATTR_PLATFORM));
}
/** {@inheritDoc} */
- @Override public PlatformTarget message(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy message(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection()));
+ return proxy(new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())));
}
/** {@inheritDoc} */
- @Override public PlatformTarget events(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy events(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection()));
+ return proxy(new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())));
}
/** {@inheritDoc} */
- @Override public PlatformTarget services(PlatformTarget grp) {
- PlatformClusterGroup grp0 = (PlatformClusterGroup)grp;
+ @Override public PlatformTargetProxy services(PlatformTargetProxy grp) {
+ PlatformClusterGroup grp0 = (PlatformClusterGroup)grp.unwrap();
- return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false);
+ return proxy(new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false));
}
/** {@inheritDoc} */
- @Override public PlatformTarget extensions() {
+ @Override public PlatformTargetProxy extensions() {
return null;
}
@@ -356,28 +356,32 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicLong(String name, long initVal, boolean create) throws IgniteException {
GridCacheAtomicLongImpl atomicLong = (GridCacheAtomicLongImpl)ignite().atomicLong(name, initVal, create);
if (atomicLong == null)
return null;
- return new PlatformAtomicLong(platformCtx, atomicLong);
+ return proxy(new PlatformAtomicLong(platformCtx, atomicLong));
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException {
+ @Override public PlatformTargetProxy atomicSequence(String name, long initVal, boolean create)
+ throws IgniteException {
IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create);
if (atomicSeq == null)
return null;
- return new PlatformAtomicSequence(platformCtx, atomicSeq);
+ return proxy(new PlatformAtomicSequence(platformCtx, atomicSeq));
}
/** {@inheritDoc} */
- @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException {
- return PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create);
+ @Override public PlatformTargetProxy atomicReference(String name, long memPtr, boolean create)
+ throws IgniteException {
+ PlatformAtomicReference ref = PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create);
+
+ return ref != null ? proxy(ref) : null;
}
/** {@inheritDoc} */
@@ -427,7 +431,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget createNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy createNearCache(@Nullable String cacheName, long memPtr) {
NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createNearCache(cacheName, cfg);
@@ -436,7 +440,7 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
+ @Override public PlatformTargetProxy getOrCreateNearCache(@Nullable String cacheName, long memPtr) {
NearCacheConfiguration cfg = getNearCacheConfiguration(memPtr);
IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateNearCache(cacheName, cfg);
@@ -447,8 +451,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
/**
* Creates new platform cache.
*/
- private PlatformTarget createPlatformCache(IgniteCacheProxy cache) {
- return new PlatformCache(platformCtx, cache, false, cacheExts);
+ private PlatformTargetProxy createPlatformCache(IgniteCacheProxy cache) {
+ return proxy(new PlatformCache(platformCtx, cache, false, cacheExts));
}
/** {@inheritDoc} */
@@ -504,8 +508,8 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/** {@inheritDoc} */
- @Override public PlatformTarget binaryProcessor() {
- return new PlatformBinaryProcessor(platformCtx);
+ @Override public PlatformTargetProxy binaryProcessor() {
+ return proxy(new PlatformBinaryProcessor(platformCtx));
}
/**
@@ -580,6 +584,13 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf
}
/**
+ * Wraps target in a proxy.
+ */
+ private PlatformTargetProxy proxy(PlatformTarget target) {
+ return new PlatformTargetProxyImpl(target, platformCtx);
+ }
+
+ /**
* Store and manager pair.
*/
private static class StoreInfo {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 805fd5e..5d234dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.jetbrains.annotations.Nullable;
/**
@@ -27,94 +29,89 @@ import org.jetbrains.annotations.Nullable;
@SuppressWarnings("UnusedDeclaration")
public interface PlatformTarget {
/**
- * Operation accepting long value and returning long value.
+ * Process IN operation.
*
- * @param type Operation type.
+ * @param type Type.
* @param val Value.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public long inLongOutLong(int type, long val) throws Exception;
+ long processInLongOutLong(int type, long val) throws IgniteCheckedException;
/**
- * Operation accepting memory stream and returning long value.
+ * Process IN operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
+ * @param type Type.
+ * @param reader Binary reader.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public long inStreamOutLong(int type, long memPtr) throws Exception;
+ long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
/**
- * Operation accepting memory stream and returning object.
+ * Process IN operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
+ * @param type Type.
+ * @param reader Binary reader.
* @return Result.
- * @throws Exception If case of failure.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object inStreamOutObject(int type, long memPtr) throws Exception;
+ long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException;
/**
- * Operation accepting one memory stream and returning result to another memory stream.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param reader Binary reader.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception;
+ void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ throws IgniteCheckedException;
/**
- * Operation accepting an object and a memory stream and returning result to another memory stream and an object.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param arg Argument (optional).
- * @param inMemPtr Input memory pointer.
- * @param outMemPtr Output memory pointer.
- * @return Result.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param reader Binary reader.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
- throws Exception;
+ PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
/**
- * Operation returning result to memory stream.
+ * Process IN-OUT operation.
*
- * @param type Operation type.
- * @param memPtr Memory pointer.
- * @throws Exception In case of failure.
+ * @param type Type.
+ * @param arg Argument.
+ * @param reader Binary reader.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public void outStream(int type, long memPtr) throws Exception;
+ PlatformTarget processInObjectStreamOutObjectStream(int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader,
+ BinaryRawWriterEx writer) throws IgniteCheckedException;
/**
- * Operation returning object result.
+ * Process OUT operation.
*
- * @param type Operation type.
- * @return Result.
- * @throws Exception If failed.
+ * @param type Type.
+ * @param writer Binary writer.
+ * @throws IgniteCheckedException In case of exception.
*/
- public Object outObject(int type) throws Exception;
+ void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException;
/**
- * Start listening for the future.
+ * Process OUT operation.
*
- * @param futId Future ID.
- * @param typ Result type.
- * @throws IgniteCheckedException In case of failure.
+ * @param type Type.
+ * @throws IgniteCheckedException In case of exception.
*/
- @SuppressWarnings("UnusedDeclaration")
- public void listenFuture(final long futId, int typ) throws Exception;
+ PlatformTarget processOutObject(int type) throws IgniteCheckedException;
/**
- * Start listening for the future for specific operation type.
+ * Convert caught exception.
*
- * @param futId Future ID.
- * @param typ Result type.
- * @param opId Operation ID required to pick correct result writer.
- * @throws IgniteCheckedException In case of failure.
+ * @param e Exception to convert.
+ * @return Converted exception.
*/
- @SuppressWarnings("UnusedDeclaration")
- public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
+ Exception convertException(Exception e);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
new file mode 100644
index 0000000..a4f2a56
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public interface PlatformTargetProxy {
+ /**
+ * Operation accepting long value and returning long value.
+ *
+ * @param type Operation type.
+ * @param val Value.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ long inLongOutLong(int type, long val) throws Exception;
+
+ /**
+ * Operation accepting memory stream and returning long value.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ long inStreamOutLong(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation accepting memory stream and returning object.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ Object inStreamOutObject(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation accepting one memory stream and returning result to another memory stream.
+ *
+ * @param type Operation type.
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @throws Exception In case of failure.
+ */
+ void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception;
+
+ /**
+ * Operation accepting an object and a memory stream and returning result to another memory stream and an object.
+ *
+ * @param type Operation type.
+ * @param arg Argument (optional).
+ * @param inMemPtr Input memory pointer.
+ * @param outMemPtr Output memory pointer.
+ * @return Result.
+ * @throws Exception In case of failure.
+ */
+ Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
+ throws Exception;
+
+ /**
+ * Operation returning result to memory stream.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @throws Exception In case of failure.
+ */
+ void outStream(int type, long memPtr) throws Exception;
+
+ /**
+ * Operation returning object result.
+ *
+ * @param type Operation type.
+ * @return Result.
+ * @throws Exception If failed.
+ */
+ Object outObject(int type) throws Exception;
+
+ /**
+ * Start listening for the future.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ void listenFuture(final long futId, int typ) throws Exception;
+
+ /**
+ * Start listening for the future for specific operation type.
+ *
+ * @param futId Future ID.
+ * @param typ Result type.
+ * @param opId Operation ID required to pick correct result writer.
+ * @throws IgniteCheckedException In case of failure.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
+
+ /**
+ * Returns the underlying target.
+ *
+ * @return Underlying target.
+ */
+ PlatformTarget unwrap();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
new file mode 100644
index 0000000..25a4de8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+
+/**
+ * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
+ */
+public class PlatformTargetProxyImpl implements PlatformTargetProxy {
+ /** Context. */
+ protected final PlatformContext platformCtx;
+
+ /** Underlying target. */
+ private final PlatformTarget target;
+
+ public PlatformTargetProxyImpl(PlatformTarget target, PlatformContext platformCtx) {
+ assert platformCtx != null;
+ assert target != null;
+
+ this.platformCtx = platformCtx;
+ this.target = target;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long inLongOutLong(int type, long val) throws Exception {
+ try {
+ return target.processInLongOutLong(type, val);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long inStreamOutLong(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+ return target.processInStreamOutLong(type, reader, mem);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
+ BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
+
+ return wrapProxy(target.processInStreamOutObject(type, reader));
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void outStream(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ PlatformOutputStream out = mem.output();
+
+ BinaryRawWriterEx writer = platformCtx.writer(out);
+
+ target.processOutStream(type, writer);
+
+ out.synchronize();
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object outObject(int type) throws Exception {
+ try {
+ return wrapProxy(target.processOutObject(type));
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception {
+ try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(inMem);
+
+ try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
+ PlatformOutputStream out = outMem.output();
+
+ BinaryRawWriterEx writer = platformCtx.writer(out);
+
+ target.processInStreamOutStream(type, reader, writer);
+
+ out.synchronize();
+ }
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
+ throws Exception {
+ PlatformMemory inMem = null;
+ PlatformMemory outMem = null;
+
+ try {
+ BinaryRawReaderEx reader = null;
+
+ if (inMemPtr != 0) {
+ inMem = platformCtx.memory().get(inMemPtr);
+
+ reader = platformCtx.reader(inMem);
+ }
+
+ PlatformOutputStream out = null;
+ BinaryRawWriterEx writer = null;
+
+ if (outMemPtr != 0) {
+ outMem = platformCtx.memory().get(outMemPtr);
+
+ out = outMem.output();
+
+ writer = platformCtx.writer(out);
+ }
+
+ PlatformTarget res = target.processInObjectStreamOutObjectStream(type, unwrapProxy(arg), reader, writer);
+
+ if (out != null)
+ out.synchronize();
+
+ return wrapProxy(res);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ finally {
+ try {
+ if (inMem != null)
+ inMem.close();
+ }
+ finally {
+ if (outMem != null)
+ outMem.close();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listenFuture(final long futId, int typ) throws Exception {
+ PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
+ PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), target);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformTarget unwrap() {
+ return target;
+ }
+
+ /**
+ * @return Future writer.
+ */
+ private PlatformFutureUtils.Writer futureWriter(int opId) {
+ return ((PlatformAsyncTarget)target).futureWriter(opId);
+ }
+
+ /**
+ * @return Current future.
+ */
+ private IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ return ((PlatformAsyncTarget)target).currentFuture();
+ }
+
+ /**
+ * Wraps an object in a proxy when possible.
+ *
+ * @param obj Object to wrap.
+ * @return Wrapped object.
+ */
+ private Object wrapProxy(PlatformTarget obj) {
+ return obj == null ? null : new PlatformTargetProxyImpl(obj, platformCtx);
+ }
+
+ /**
+ * Unwraps an object from a proxy when possible.
+ *
+ * @param obj Object to unwrap.
+ * @return Unwrapped object.
+ */
+ private PlatformTarget unwrapProxy(Object obj) {
+ return obj == null ? null : ((PlatformTargetProxyImpl)obj).target;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
index 1bb577e..3c00abc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
@@ -49,7 +49,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
if (type == OP_PUT_META) {
platformCtx.processMetadata(reader);
@@ -60,7 +60,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_GET_ALL_META)
platformCtx.writeAllMetadata(writer);
else
@@ -68,7 +68,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader,
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_META: {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index aec3703..aee317e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
@@ -400,7 +401,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem)
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem)
throws IgniteCheckedException {
try {
switch (type) {
@@ -824,7 +825,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
throws IgniteCheckedException {
switch (type) {
case OP_QRY_SQL:
@@ -903,7 +904,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_NAME:
writer.writeObject(cache.getName());
@@ -940,7 +941,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_ASYNC: {
if (cache.isAsync())
@@ -983,7 +984,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_SIZE: {
CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes((int)val);
@@ -1121,12 +1122,12 @@ public class PlatformCache extends PlatformAbstractTarget {
}
/** <inheritDoc /> */
- @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+ @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl) cacheAsync.future()).internalFuture();
}
/** <inheritDoc /> */
- @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) {
if (opId == OP_GET_ALL)
return WRITER_GET_ALL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
index 292caea..4c11cc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
@@ -47,7 +47,7 @@ public class PlatformCacheIterator extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_NEXT:
if (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
index 12df188..e24345c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
@@ -117,7 +117,7 @@ public class PlatformAffinity extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_PARTITION:
return aff.partition(reader.readObjectDetached());
@@ -168,7 +168,7 @@ public class PlatformAffinity extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_PRIMARY_PARTITIONS: {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
index 8076a19..2d3cada 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -279,7 +280,11 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
? new PlatformAffinityFunctionTarget(ctx, baseFunc)
: null;
- ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget);
+ PlatformTargetProxyImpl baseTargetProxy = baseTarget != null
+ ? new PlatformTargetProxyImpl(baseTarget, ctx)
+ : null;
+
+ ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTargetProxy);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
index 8a07b33..342e726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
@@ -71,7 +71,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
if (type == OP_PARTITION)
return baseFunc.partition(reader.readObjectDetached());
else if (type == OP_REMOVE_NODE) {
@@ -84,7 +84,7 @@ public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
if (type == OP_ASSIGN_PARTITIONS) {
AffinityFunctionContext affCtx = currentAffCtx.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
index 6a259ca..f201425 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -71,7 +71,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
}
/** {@inheritDoc} */
- @Override protected void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, final BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_GET_BATCH: {
assert iter != null : "iterator() has not been called";
@@ -136,7 +136,7 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ITERATOR:
iter = cursor.iterator();
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
index 04f17ff..27d784a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.cache.query;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
/**
* Proxy that implements PlatformTarget.
@@ -41,7 +42,7 @@ public class PlatformContinuousQueryProxy extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override public Object outObject(int type) throws Exception {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
return qry.getInitialQueryCursor();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index f21861e..c77f501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
/**
@@ -429,7 +429,7 @@ public class PlatformCallbackGateway {
* @param memPtr Stream pointer.
* @param keepBinary Binary flag.
*/
- public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepBinary) {
+ public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) {
enter();
try {
@@ -995,7 +995,7 @@ public class PlatformCallbackGateway {
* @param baseFunc Optional func for base calls.
* @return Affinity function pointer.
*/
- public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) {
+ public long affinityFunctionInit(long memPtr, PlatformTargetProxy baseFunc) {
enter();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 50c4c28..9d60ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.platform.callback;
-import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
+import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
/**
* Platform callback utility methods. Implemented in target platform. All methods in this class must be
@@ -226,7 +226,7 @@ public class PlatformCallbackUtils {
* @param memPtr Stream pointer.
* @param keepBinary Binary flag.
*/
- static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr,
+ static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr,
boolean keepBinary);
/**
@@ -504,7 +504,7 @@ public class PlatformCallbackUtils {
* @param baseFunc Optional func for base calls.
* @return Affinity function pointer.
*/
- static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc);
+ static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc);
/**
* Gets the partition from affinity function.
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
index dc73468..f49f477 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.cache.PlatformCache;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -120,7 +121,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
- @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
switch (type) {
case OP_METRICS:
platformCtx.writeClusterMetrics(writer, prj.metrics());
@@ -134,7 +135,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
/** {@inheritDoc} */
@SuppressWarnings({"ConstantConditions", "deprecation"})
- @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_METRICS_FILTERED: {
@@ -217,7 +218,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
case OP_PING_NODE:
return pingNode(reader.readUuid()) ? TRUE : FALSE;
@@ -228,7 +229,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader)
+ throws IgniteCheckedException {
switch (type) {
case OP_FOR_NODE_IDS: {
Collection<UUID> ids = PlatformUtils.readCollection(reader);
@@ -272,8 +274,8 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processInObjectStreamOutObjectStream(
- int type, @Nullable Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+ @Override public PlatformTarget processInObjectStreamOutObjectStream(
+ int type, @Nullable PlatformTarget arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
throws IgniteCheckedException {
switch (type) {
case OP_FOR_OTHERS: {
@@ -289,7 +291,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+ @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException {
switch (type) {
case OP_FOR_REMOTES:
return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
@@ -314,7 +316,7 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
- @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_RESET_METRICS: {
assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.