You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/07/28 07:06:33 UTC
[3/3] ignite git commit: IGNITE-5769 Abstract away .NET->Java calls
IGNITE-5769 Abstract away .NET->Java calls
This closes #2352
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89bba2fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89bba2fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89bba2fa
Branch: refs/heads/master
Commit: 89bba2fa2c423d5713c8412ba0069b869005694c
Parents: 47fea40
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 28 10:06:16 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 28 10:06:16 2017 +0300
----------------------------------------------------------------------
.../platform/PlatformTargetProxy.java | 11 +
.../platform/PlatformTargetProxyImpl.java | 79 +-
.../plugin/PlatformTestPluginTarget.java | 7 +-
.../cpp/jni/include/ignite/jni/exports.h | 1 +
.../platforms/cpp/jni/include/ignite/jni/java.h | 5 +-
modules/platforms/cpp/jni/project/vs/module.def | 1 +
modules/platforms/cpp/jni/src/exports.cpp | 4 +
modules/platforms/cpp/jni/src/java.cpp | 19 +-
.../Plugin/PluginTest.cs | 13 +-
.../Apache.Ignite.Core.Tests/TestUtils.cs | 7 +-
.../Apache.Ignite.Core.csproj | 5 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 9 +-
.../Impl/Binary/BinaryProcessor.cs | 6 +-
.../Impl/Binary/BinaryWriterExtensions.cs | 107 ++
.../Cache/Affinity/PlatformAffinityFunction.cs | 7 +-
.../Impl/Cache/CacheAffinityImpl.cs | 18 +-
.../Impl/Cache/CacheEnumerator.cs | 8 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 113 +-
.../Impl/Cache/Query/AbstractQueryCursor.cs | 12 +-
.../Continuous/ContinuousQueryHandleImpl.cs | 12 +-
.../Impl/Cache/Query/FieldsQueryCursor.cs | 6 +-
.../Impl/Cache/Query/QueryCursor.cs | 5 +-
.../Impl/Cluster/ClusterGroupImpl.cs | 76 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 9 +-
.../Impl/Common/Listenable.cs | 8 +-
.../Impl/Compute/ComputeImpl.cs | 12 +-
.../Impl/DataStructures/AtomicLong.cs | 9 +-
.../Impl/DataStructures/AtomicReference.cs | 8 +-
.../Impl/DataStructures/AtomicSequence.cs | 9 +-
.../Impl/Datastream/DataStreamerImpl.cs | 8 +-
.../Impl/Datastream/StreamReceiverHolder.cs | 13 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 11 +-
.../Impl/IPlatformTargetInternal.cs | 102 ++
.../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 48 +-
.../Impl/Messaging/Messaging.cs | 10 +-
.../Impl/PlatformDisposableTargetAdapter.cs | 75 ++
.../Impl/PlatformJniTarget.cs | 536 +++++++++
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 1086 ------------------
.../Impl/PlatformTargetAdapter.cs | 534 +++++++++
.../Impl/Services/Services.cs | 19 +-
.../Impl/Transactions/TransactionsImpl.cs | 29 +-
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 10 +-
.../Impl/Unmanaged/UnmanagedUtils.cs | 7 +
.../Interop/IPlatformTarget.cs | 15 +
45 files changed, 1690 insertions(+), 1402 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
index 1ee57cb..29de311 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -105,6 +105,17 @@ public interface PlatformTargetProxy {
void inStreamAsync(int type, long memPtr) throws Exception;
/**
+ * Asynchronous operation accepting memory stream and returning PlatformListenableTarget.
+ * Supports cancellable async operations.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ Object inStreamOutObjectAsync(int type, long memPtr) throws Exception;
+
+ /**
* Returns the underlying target.
*
* @return Underlying target.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
index 44044b1..b472275 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget;
import org.apache.ignite.lang.IgniteFuture;
/**
@@ -109,37 +111,16 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
/** {@inheritDoc} */
@Override public void inStreamAsync(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(mem);
-
- long futId = reader.readLong();
- int futTyp = reader.readInt();
-
- final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
-
- if (res == null)
- throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
-
- IgniteFuture fut = res.future();
+ inStreamOutListenableAsync(type, memPtr);
+ }
- if (fut == null)
- throw new IgniteException("PlatformAsyncResult.future() should not return null.");
+ /** {@inheritDoc} */
+ @Override public Object inStreamOutObjectAsync(int type, long memPtr) throws Exception {
+ PlatformListenable listenable = inStreamOutListenableAsync(type, memPtr);
- PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
- /** {@inheritDoc} */
- @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
- res.write(writer, obj);
- }
+ PlatformListenableTarget target = new PlatformListenableTarget(listenable, platformCtx);
- /** {@inheritDoc} */
- @Override public boolean canWrite(Object obj, Throwable err) {
- return err == null;
- }
- }, target);
- }
- catch (Exception e) {
- throw target.convertException(e);
- }
+ return wrapProxy(target);
}
/** {@inheritDoc} */
@@ -234,4 +215,46 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
private PlatformTarget unwrapProxy(Object obj) {
return obj == null ? null : ((PlatformTargetProxyImpl)obj).target;
}
+
+ /**
+ * Performs asyncronous operation.
+ *
+ * @param type Type.
+ * @param memPtr Stream pointer.
+ * @return Listenable.
+ * @throws Exception On error.
+ */
+ private PlatformListenable inStreamOutListenableAsync(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+ long futId = reader.readLong();
+ int futTyp = reader.readInt();
+
+ final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
+
+ if (res == null)
+ throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
+
+ IgniteFuture fut = res.future();
+
+ if (fut == null)
+ throw new IgniteException("PlatformAsyncResult.future() should not return null.");
+
+ return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
+ /** {@inheritDoc} */
+ @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
+ res.write(writer, obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return err == null;
+ }
+ }, target);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
index 7e69425..8c1cbe9 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
@@ -146,7 +146,12 @@ class PlatformTestPluginTarget implements PlatformTarget {
case 1: {
// Async upper case.
final String val = reader.readString();
- final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+ final GridFutureAdapter<String> fa = new GridFutureAdapter<String>() {
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return onCancelled();
+ }
+ };
new Thread(new Runnable() {
@Override public void run() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index ea0c32a..0580d19 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -38,6 +38,7 @@ extern "C" {
void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
+ void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj);
void IGNITE_CALL IgniteRelease(void* obj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index c170a5b..c713e81 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -173,9 +173,6 @@ namespace ignite
jmethodID m_PlatformIgnition_stop;
jmethodID m_PlatformIgnition_stopAll;
- jclass c_PlatformProcessor;
- jmethodID m_PlatformProcessor_releaseStart;
-
jclass c_PlatformTarget;
jmethodID m_PlatformTarget_inLongOutLong;
jmethodID m_PlatformTarget_inStreamOutLong;
@@ -183,6 +180,7 @@ namespace ignite
jmethodID m_PlatformTarget_outStream;
jmethodID m_PlatformTarget_outObject;
jmethodID m_PlatformTarget_inStreamAsync;
+ jmethodID m_PlatformTarget_inStreamOutObjectAsync;
jmethodID m_PlatformTarget_inStreamOutStream;
jmethodID m_PlatformTarget_inObjectStreamOutObjectStream;
@@ -325,6 +323,7 @@ namespace ignite
void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL);
void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
+ jobject TargetInStreamOutObjectAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index 53e7e42..1407f82 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -12,6 +12,7 @@ IgniteTargetInStreamOutStream @20
IgniteTargetInObjectStreamOutObjectStream @21
IgniteTargetInLongOutLong @24
IgniteTargetInStreamAsync @25
+IgniteTargetInStreamOutObjectAsync @26
IgniteAcquire @80
IgniteRelease @81
IgniteThrowToJava @82
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index 9b7defd..aeb68ab 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -74,6 +74,10 @@ extern "C" {
ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr);
}
+ void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr) {
+ return ctx->TargetInStreamOutObjectAsync(static_cast<jobject>(obj), opType, memPtr);
+ }
+
void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj) {
return ctx->Acquire(static_cast<jobject>(obj));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 7eadec0..ac4ba63 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -221,9 +221,6 @@ namespace ignite
const char* C_PLATFORM_NO_CALLBACK_EXCEPTION = "org/apache/ignite/internal/processors/platform/PlatformNoCallbackException";
- const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor";
- JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false);
-
const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy";
JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false);
@@ -233,6 +230,7 @@ namespace ignite
JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false);
JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V", false);
+ JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC = JniMethod("inStreamOutObjectAsync", "(IJ)Ljava/lang/Object;", false);
const char* C_PLATFORM_CALLBACK_UTILS = "org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils";
@@ -449,9 +447,6 @@ namespace ignite
m_PlatformIgnition_stop = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP);
m_PlatformIgnition_stopAll = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP_ALL);
- c_PlatformProcessor = FindClass(env, C_PLATFORM_PROCESSOR);
- m_PlatformProcessor_releaseStart = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_RELEASE_START);
-
c_PlatformTarget = FindClass(env, C_PLATFORM_TARGET);
m_PlatformTarget_inLongOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_LONG_OUT_LONG);
m_PlatformTarget_inStreamOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_LONG);
@@ -461,6 +456,7 @@ namespace ignite
m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM);
m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM);
m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC);
+ m_PlatformTarget_inStreamOutObjectAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC);
c_PlatformUtils = FindClass(env, C_PLATFORM_UTILS);
m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
@@ -473,7 +469,6 @@ namespace ignite
void JniMembers::Destroy(JNIEnv* env) {
DeleteClass(env, c_IgniteException);
DeleteClass(env, c_PlatformIgnition);
- DeleteClass(env, c_PlatformProcessor);
DeleteClass(env, c_PlatformTarget);
DeleteClass(env, c_PlatformUtils);
}
@@ -894,6 +889,16 @@ namespace ignite
ExceptionCheck(env, err);
}
+ jobject JniContext::TargetInStreamOutObjectAsync(jobject obj, int opType, long long memPtr, JniErrorInfo* err) {
+ JNIEnv* env = Attach();
+
+ jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamOutObjectAsync, opType, memPtr);
+
+ ExceptionCheck(env, err);
+
+ return LocalToGlobal(env, res);
+ }
+
jobject JniContext::CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* err) {
JNIEnv* env = Attach();
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
index 00b1cca..1cb2fae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
@@ -21,6 +21,8 @@ namespace Apache.Ignite.Core.Tests.Plugin
using System.Collections.Generic;
using System.IO;
using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Interop;
@@ -142,13 +144,22 @@ namespace Apache.Ignite.Core.Tests.Plugin
Assert.IsTrue(task.IsCompleted);
Assert.AreEqual("FOO", asyncRes);
+ // Async operation with cancellation.
+ var cts = new CancellationTokenSource();
+ task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString(), cts.Token);
+ Assert.IsFalse(task.IsCompleted);
+ cts.Cancel();
+ Assert.IsTrue(task.IsCanceled);
+ var aex = Assert.Throws<AggregateException>(() => { asyncRes = task.Result; });
+ Assert.IsInstanceOf<TaskCanceledException>(aex.GetBaseException());
+
// Async operation with exception in entry point.
Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2, null, null));
// Async operation with exception in future.
var errTask = target.DoOutOpAsync<object>(3, null, null);
Assert.IsFalse(errTask.IsCompleted);
- var aex = Assert.Throws<AggregateException>(() => errTask.Wait());
+ aex = Assert.Throws<AggregateException>(() => errTask.Wait());
Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single());
// Throws custom mapped exception.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
index 6e0a497..4b171b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
@@ -365,12 +365,11 @@ namespace Apache.Ignite.Core.Tests
};
var proc = System.Diagnostics.Process.Start(procStart);
-
Assert.IsNotNull(proc);
- Console.WriteLine(proc.StandardOutput.ReadToEnd());
- Console.WriteLine(proc.StandardError.ReadToEnd());
- Assert.IsTrue(proc.WaitForExit(15000));
+ IgniteProcess.AttachProcessConsoleReader(proc);
+
+ Assert.IsTrue(proc.WaitForExit(19000));
Assert.AreEqual(0, proc.ExitCode);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 76132c3..c444ed0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -99,7 +99,10 @@
<Compile Include="Cache\IMemoryMetrics.cs" />
<Compile Include="Common\ExceptionFactory.cs" />
<Compile Include="Configuration\Package-Info.cs" />
+ <Compile Include="Impl\IPlatformTargetInternal.cs" />
<Compile Include="Impl\PersistentStore\PersistentStoreMetrics.cs" />
+ <Compile Include="Impl\PlatformDisposableTargetAdapter.cs" />
+ <Compile Include="Impl\PlatformJniTarget.cs" />
<Compile Include="PersistentStore\IPersistentStoreMetrics.cs" />
<Compile Include="PersistentStore\Package-Info.cs" />
<Compile Include="PersistentStore\PersistentStoreConfiguration.cs" />
@@ -385,7 +388,7 @@
<Compile Include="Impl\Ignite.cs" />
<Compile Include="Impl\IgniteManager.cs" />
<Compile Include="Impl\Log\JavaLogger.cs" />
- <Compile Include="Impl\PlatformTarget.cs" />
+ <Compile Include="Impl\PlatformTargetAdapter.cs" />
<Compile Include="Impl\IgniteUtils.cs" />
<Compile Include="Impl\Handle\Handle.cs" />
<Compile Include="Impl\Handle\HandleRegistry.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index 44ebef3..568eea7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -239,7 +239,7 @@ namespace Apache.Ignite.Core
// 3. Create startup object which will guide us through the rest of the process.
_startup = new Startup(cfg, cbs);
- IUnmanagedTarget interopProc = null;
+ PlatformJniTarget interopProc = null;
try
{
@@ -249,7 +249,7 @@ namespace Apache.Ignite.Core
// 5. At this point start routine is finished. We expect STARTUP object to have all necessary data.
var node = _startup.Ignite;
- interopProc = node.InteropProcessor;
+ interopProc = (PlatformJniTarget)node.InteropProcessor;
var javaLogger = log as JavaLogger;
if (javaLogger != null)
@@ -279,7 +279,7 @@ namespace Apache.Ignite.Core
// 2. Stop Ignite node if it was started.
if (interopProc != null)
- UU.IgnitionStop(interopProc.Context, gridName, true);
+ UU.IgnitionStop(interopProc.Target.Context, gridName, true);
// 3. Throw error further (use startup error if exists because it is more precise).
if (_startup.Error != null)
@@ -466,7 +466,8 @@ namespace Apache.Ignite.Core
if (Nodes.ContainsKey(new NodeKey(name)))
throw new IgniteException("Ignite with the same name already started: " + name);
- _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller,
+ _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name,
+ new PlatformJniTarget(interopProc, _startup.Marshaller), _startup.Marshaller,
_startup.LifecycleHandlers, _startup.Callbacks);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
index b8937c9..69056b3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
@@ -21,12 +21,11 @@ namespace Apache.Ignite.Core.Impl.Binary
using System.Diagnostics;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary.Metadata;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Binary metadata processor, delegates to PlatformBinaryProcessor in Java.
/// </summary>
- internal class BinaryProcessor : PlatformTarget
+ internal class BinaryProcessor : PlatformTargetAdapter
{
/// <summary>
/// Op codes.
@@ -46,8 +45,7 @@ namespace Apache.Ignite.Core.Impl.Binary
/// Initializes a new instance of the <see cref="BinaryProcessor"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public BinaryProcessor(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public BinaryProcessor(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
index 64bfa35..3dc8a96 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
@@ -18,6 +18,8 @@
namespace Apache.Ignite.Core.Impl.Binary
{
using System;
+ using System.Collections.Generic;
+ using System.IO;
using Apache.Ignite.Core.Binary;
/// <summary>
@@ -75,5 +77,110 @@ namespace Apache.Ignite.Core.Impl.Binary
writer.WriteBoolean(false);
}
+ /// <summary>
+ /// Write collection.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <param name="selector">A transform function to apply to each element.</param>
+ /// <returns>The same writer for chaining.</returns>
+ private static void WriteCollection<T1, T2>(this BinaryWriter writer, ICollection<T1> vals,
+ Func<T1, T2> selector)
+ {
+ writer.WriteInt(vals.Count);
+
+ if (selector == null)
+ {
+ foreach (var val in vals)
+ writer.Write(val);
+ }
+ else
+ {
+ foreach (var val in vals)
+ writer.Write(selector(val));
+ }
+ }
+
+ /// <summary>
+ /// Write enumerable.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <returns>The same writer for chaining.</returns>
+ public static void WriteEnumerable<T>(this BinaryWriter writer, IEnumerable<T> vals)
+ {
+ WriteEnumerable<T, T>(writer, vals, null);
+ }
+
+ /// <summary>
+ /// Write enumerable.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <param name="selector">A transform function to apply to each element.</param>
+ /// <returns>The same writer for chaining.</returns>
+ public static void WriteEnumerable<T1, T2>(this BinaryWriter writer, IEnumerable<T1> vals,
+ Func<T1, T2> selector)
+ {
+ var col = vals as ICollection<T1>;
+
+ if (col != null)
+ {
+ WriteCollection(writer, col, selector);
+ return;
+ }
+
+ var stream = writer.Stream;
+
+ var pos = stream.Position;
+
+ stream.Seek(4, SeekOrigin.Current);
+
+ var size = 0;
+
+ if (selector == null)
+ {
+ foreach (var val in vals)
+ {
+ writer.Write(val);
+
+ size++;
+ }
+ }
+ else
+ {
+ foreach (var val in vals)
+ {
+ writer.Write(selector(val));
+
+ size++;
+ }
+ }
+
+ stream.WriteInt(pos, size);
+ }
+
+ /// <summary>
+ /// Write dictionary.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ public static void WriteDictionary<T1, T2>(this BinaryWriter writer, IEnumerable<KeyValuePair<T1, T2>> vals)
+ {
+ var pos = writer.Stream.Position;
+ writer.WriteInt(0); // Reserve count.
+
+ int cnt = 0;
+
+ foreach (var pair in vals)
+ {
+ writer.Write(pair.Key);
+ writer.Write(pair.Value);
+
+ cnt++;
+ }
+
+ writer.Stream.WriteInt(pos, cnt);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
index d335804..08c31a6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
@@ -21,13 +21,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity
using System.Collections.Generic;
using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Affinity function that delegates to Java.
/// </summary>
- internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction
+ internal class PlatformAffinityFunction : PlatformTargetAdapter, IAffinityFunction
{
/** Opcodes. */
private enum Op
@@ -41,8 +39,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity
/// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public PlatformAffinityFunction(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
index f09a119..a2bba29 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
@@ -19,19 +19,16 @@ namespace Apache.Ignite.Core.Impl.Cache
{
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Cache affinity implementation.
/// </summary>
- internal class CacheAffinityImpl : PlatformTarget, ICacheAffinity
+ internal class CacheAffinityImpl : PlatformTargetAdapter, ICacheAffinity
{
/** */
private const int OpAffinityKey = 1;
@@ -88,17 +85,12 @@ namespace Apache.Ignite.Core.Impl.Cache
/// Initializes a new instance of the <see cref="CacheAffinityImpl" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- /// <param name="ignite">Grid.</param>
- public CacheAffinityImpl(IUnmanagedTarget target, Marshaller marsh, bool keepBinary,
- Ignite ignite) : base(target, marsh)
+ public CacheAffinityImpl(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
- Debug.Assert(ignite != null);
-
- _ignite = ignite;
+ _ignite = target.Marshaller.Ignite;
}
/** <inheritDoc /> */
@@ -182,7 +174,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp(OpMapKeysToNodes, w => WriteEnumerable(w, keys),
+ return DoOutInOp(OpMapKeysToNodes, w => w.WriteEnumerable(keys),
reader => ReadDictionary(reader, ReadNode, r => (IList<TK>) r.ReadCollectionAsList<TK>()));
}
@@ -214,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(parts, "parts");
return DoOutInOp(OpMapPartitionsToNodes,
- w => WriteEnumerable(w, parts),
+ w => w.WriteEnumerable(parts),
reader => ReadDictionary(reader, r => r.ReadInt(), ReadNode));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
index e2b8350..2860bb6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
@@ -21,14 +21,12 @@ namespace Apache.Ignite.Core.Impl.Cache
using System.Collections;
using System.Collections.Generic;
using Apache.Ignite.Core.Cache;
- using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Real cache enumerator communicating with Java.
/// </summary>
- internal class CacheEnumerator<TK, TV> : PlatformDisposableTarget, IEnumerator<ICacheEntry<TK, TV>>
+ internal class CacheEnumerator<TK, TV> : PlatformDisposableTargetAdapter, IEnumerator<ICacheEntry<TK, TV>>
{
/** Operation: next value. */
private const int OpNext = 1;
@@ -43,10 +41,8 @@ namespace Apache.Ignite.Core.Impl.Cache
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- public CacheEnumerator(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) :
- base(target, marsh)
+ public CacheEnumerator(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index e6b2408..5789c8f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache
using System;
using System.Collections;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
@@ -38,13 +37,12 @@ namespace Apache.Ignite.Core.Impl.Cache
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Transactions;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Native cache wrapper.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class CacheImpl<TK, TV> : PlatformTarget, ICache<TK, TV>, ICacheInternal, ICacheLockInternal
+ internal class CacheImpl<TK, TV> : PlatformTargetAdapter, ICache<TK, TV>, ICacheInternal, ICacheLockInternal
{
/** Ignite instance. */
private readonly Ignite _ignite;
@@ -64,31 +62,32 @@ namespace Apache.Ignite.Core.Impl.Cache
/** Transaction manager. */
private readonly CacheTransactionManager _txManager;
+ /** Pre-allocated delegate. */
+ private readonly Func<IBinaryStream, Exception> _readException;
+
/// <summary>
/// Constructor.
/// </summary>
- /// <param name="grid">Grid.</param>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="flagSkipStore">Skip store flag.</param>
/// <param name="flagKeepBinary">Keep binary flag.</param>
/// <param name="flagNoRetries">No-retries mode flag.</param>
/// <param name="flagPartitionRecover">Partition recover mode flag.</param>
- public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh,
+ public CacheImpl(IPlatformTargetInternal target,
bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries, bool flagPartitionRecover)
- : base(target, marsh)
+ : base(target)
{
- Debug.Assert(grid != null);
-
- _ignite = grid;
+ _ignite = target.Marshaller.Ignite;
_flagSkipStore = flagSkipStore;
_flagKeepBinary = flagKeepBinary;
_flagNoRetries = flagNoRetries;
_flagPartitionRecover = flagPartitionRecover;
_txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional
- ? new CacheTransactionManager(grid.GetTransactions())
+ ? new CacheTransactionManager(_ignite.GetTransactions())
: null;
+
+ _readException = stream => ReadException(Marshaller.StartUnmarshal(stream));
}
/** <inheritDoc /> */
@@ -172,7 +171,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagSkipStore)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithSkipStore), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithSkipStore),
true, _flagKeepBinary, true, _flagPartitionRecover);
}
@@ -196,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return result;
}
- return new CacheImpl<TK1, TV1>(_ignite, DoOutOpObject((int) CacheOp.WithKeepBinary), Marshaller,
+ return new CacheImpl<TK1, TV1>(DoOutOpObject((int) CacheOp.WithKeepBinary),
_flagSkipStore, true, _flagNoRetries, _flagPartitionRecover);
}
@@ -207,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache
var cache0 = DoOutOpObject((int)CacheOp.WithExpiryPolicy, w => ExpiryPolicySerializer.WritePolicy(w, plc));
- return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary,
+ return new CacheImpl<TK, TV>(cache0, _flagSkipStore, _flagKeepBinary,
_flagNoRetries, _flagPartitionRecover);
}
@@ -220,7 +219,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
{
- DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
+ DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), _readException);
}
/** <inheritDoc /> */
@@ -232,7 +231,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
{
- DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
+ DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), _readException);
}
/** <inheritDoc /> */
@@ -281,7 +280,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutOpAsync(CacheOp.LoadAll, writer =>
{
writer.WriteBoolean(replaceExistingValues);
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
});
}
@@ -306,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys));
+ return DoOutOp(CacheOp.ContainsKeys, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -314,7 +313,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -342,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.WriteInt(EncodePeekModes(modes));
},
(s, r) => r == True ? new CacheResult<TV>(Unmarshal<TV>(s)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
value = res.Success ? res.Value : default(TV);
@@ -375,7 +374,7 @@ namespace Apache.Ignite.Core.Impl.Cache
throw GetKeyNotFoundException();
return Unmarshal<TV>(stream);
- }, ReadException);
+ }, _readException);
}
/** <inheritDoc /> */
@@ -418,9 +417,9 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(keys, "keys");
return DoOutInOpX((int) CacheOp.GetAll,
- writer => WriteEnumerable(writer, keys),
+ writer => writer.WriteEnumerable(keys),
(s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null,
- ReadException);
+ _readException);
}
/** <inheritDoc /> */
@@ -428,7 +427,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(CacheOp.GetAllAsync, w => WriteEnumerable(w, keys), r => ReadGetAllDictionary(r));
+ return DoOutOpAsync(CacheOp.GetAllAsync, w => w.WriteEnumerable(keys), r => ReadGetAllDictionary(r));
}
/** <inheritdoc /> */
@@ -631,7 +630,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals));
+ DoOutOp(CacheOp.PutAll, writer => writer.WriteDictionary(vals));
}
/** <inheritDoc /> */
@@ -641,7 +640,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals));
+ return DoOutOpAsync(CacheOp.PutAllAsync, writer => writer.WriteDictionary(vals));
}
/** <inheritdoc /> */
@@ -649,7 +648,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocEvict, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -685,7 +684,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.ClearAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -693,7 +692,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(CacheOp.ClearAllAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync(CacheOp.ClearAllAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -709,7 +708,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocalClearAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -761,7 +760,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.RemoveAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -771,7 +770,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -843,7 +842,7 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.Write(holder);
},
(input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
- ReadException);
+ _readException);
}
/** <inheritDoc /> */
@@ -891,10 +890,12 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int) CacheOp.InvokeAll,
writer =>
{
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
writer.Write(holder);
},
- (input, res) => res == True ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)): null, ReadException);
+ (input, res) => res == True
+ ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
+ : null, _readException);
}
/** <inheritDoc /> */
@@ -912,7 +913,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutOpAsync(CacheOp.InvokeAllAsync,
writer =>
{
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
writer.Write(holder);
},
input => ReadInvokeAllResults<TRes>(input));
@@ -931,7 +932,7 @@ namespace Apache.Ignite.Core.Impl.Cache
},
(input, res) => res == True
? readFunc(Marshaller.StartUnmarshal(input))
- : default(T), ReadException);
+ : default(T), _readException);
}
/** <inheritdoc /> */
@@ -940,7 +941,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key),
- (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException);
+ (stream, res) => new CacheLock(stream.ReadInt(), this), _readException);
}
/** <inheritdoc /> */
@@ -948,8 +949,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys),
- (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException);
+ return DoOutInOpX((int) CacheOp.LockAll, w => w.WriteEnumerable(keys),
+ (stream, res) => new CacheLock(stream.ReadInt(), this), _readException);
}
/** <inheritdoc /> */
@@ -1011,7 +1012,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagNoRetries)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithNoRetries), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithNoRetries),
_flagSkipStore, _flagKeepBinary, true, _flagPartitionRecover);
}
@@ -1021,7 +1022,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagPartitionRecover)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithPartitionRecover), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithPartitionRecover),
_flagSkipStore, _flagKeepBinary, _flagNoRetries, true);
}
@@ -1092,7 +1093,7 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.WriteString(qry.Schema); // Schema
});
- return new FieldsQueryCursor<T>(cursor, Marshaller, _flagKeepBinary, readerFunc);
+ return new FieldsQueryCursor<T>(cursor, _flagKeepBinary, readerFunc);
}
/** <inheritDoc /> */
@@ -1102,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Cache
var cursor = DoOutOpObject((int) qry.OpId, writer => qry.Write(writer, IsKeepBinary));
- return new QueryCursor<TK, TV>(cursor, Marshaller, _flagKeepBinary);
+ return new QueryCursor<TK, TV>(cursor, _flagKeepBinary);
}
/** <inheritdoc /> */
@@ -1168,10 +1169,10 @@ namespace Apache.Ignite.Core.Impl.Cache
{
var target = DoOutOpObject((int) CacheOp.LocIterator, (IBinaryStream s) => s.WriteInt(peekModes));
- return new CacheEnumerator<TK, TV>(target, Marshaller, _flagKeepBinary);
+ return new CacheEnumerator<TK, TV>(target, _flagKeepBinary);
}
- return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), Marshaller, _flagKeepBinary);
+ return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), _flagKeepBinary);
}
#endregion
@@ -1228,14 +1229,6 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/// <summary>
- /// Reads the exception.
- /// </summary>
- private Exception ReadException(IBinaryStream stream)
- {
- return ReadException(Marshaller.StartUnmarshal(stream));
- }
-
- /// <summary>
/// Reads the exception, either in binary wrapper form, or as a pair of strings.
/// </summary>
/// <param name="reader">The stream.</param>
@@ -1315,7 +1308,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int) op, w =>
{
w.Write(x);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1327,7 +1320,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
w.Write(x);
w.Write(y);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1340,7 +1333,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.Write(x);
w.Write(y);
w.Write(z);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1348,7 +1341,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/// </summary>
private bool DoOutOp(CacheOp op, Action<BinaryWriter> write)
{
- return DoOutInOpX((int) op, write, ReadException);
+ return DoOutInOpX((int) op, write, _readException);
}
/// <summary>
@@ -1359,7 +1352,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int)cacheOp,
w => w.Write(x),
(stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
}
/// <summary>
@@ -1374,7 +1367,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.Write(y);
},
(stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
index 95c6a36..8e4985e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
@@ -23,13 +23,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Abstract query cursor implementation.
/// </summary>
- internal abstract class AbstractQueryCursor<T> : PlatformDisposableTarget, IQueryCursor<T>, IEnumerator<T>
+ internal abstract class AbstractQueryCursor<T> : PlatformDisposableTargetAdapter, IQueryCursor<T>, IEnumerator<T>
{
/** */
private const int OpGetAll = 1;
@@ -65,10 +63,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- protected AbstractQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) :
- base(target, marsh)
+ protected AbstractQueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
}
@@ -88,7 +84,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
throw new InvalidOperationException("Failed to get all entries because GetAll() " +
"method has already been called.");
- var res = DoInOp<IList<T>>(OpGetAll, ConvertGetAll);
+ var res = DoInOp(OpGetAll, ConvertGetAll);
_getAllCalled = true;
@@ -216,7 +212,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// </summary>
private void RequestBatch()
{
- _batch = DoInOp<T[]>(OpGetBatch, ConvertGetBatch);
+ _batch = DoInOp(OpGetBatch, ConvertGetBatch);
_batchPos = 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
index 6139d8b..ff5c434 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
@@ -28,8 +28,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
using CQU = ContinuousQueryUtils;
/// <summary>
@@ -67,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
private readonly long _hnd;
/** Native query. */
- private readonly IUnmanagedTarget _nativeQry;
+ private readonly IPlatformTargetInternal _nativeQry;
/** Initial query cursor. */
private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor;
@@ -84,7 +82,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
/// <param name="createTargetCb">The initialization callback.</param>
/// <param name="initialQry">The initial query.</param>
public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, Marshaller marsh, bool keepBinary,
- Func<Action<BinaryWriter>, IUnmanagedTarget> createTargetCb, QueryBase initialQry)
+ Func<Action<BinaryWriter>, IPlatformTargetInternal> createTargetCb, QueryBase initialQry)
{
_marsh = marsh;
_keepBinary = keepBinary;
@@ -138,10 +136,10 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
});
// 4. Initial query.
- var nativeInitialQryCur = UU.TargetOutObject(_nativeQry, 0);
+ var nativeInitialQryCur = _nativeQry.OutObjectInternal(0);
_initialQueryCursor = nativeInitialQryCur == null
? null
- : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepBinary);
+ : new QueryCursor<TK, TV>(nativeInitialQryCur, _keepBinary);
}
catch (Exception)
{
@@ -225,7 +223,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
try
{
- UU.TargetInLongOutLong(_nativeQry, 0, 0);
+ _nativeQry.InLongOutLong(0, 0);
}
finally
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
index d928418..9d021dc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using System.Diagnostics.CodeAnalysis;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Cursor for entry-based queries.
@@ -36,12 +35,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaler.</param>
/// <param name="keepBinary">Keep poratble flag.</param>
/// <param name="readerFunc">The reader function.</param>
- public FieldsQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary,
+ public FieldsQueryCursor(IPlatformTargetInternal target, bool keepBinary,
Func<IBinaryRawReader, int, T> readerFunc)
- : base(target, marsh, keepBinary)
+ : base(target, keepBinary)
{
Debug.Assert(readerFunc != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
index 5a46915..bc3cdb6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using System.Diagnostics.CodeAnalysis;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Cursor for entry-based queries.
@@ -31,10 +30,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaler.</param>
/// <param name="keepBinary">Keep poratble flag.</param>
- public QueryCursor(IUnmanagedTarget target, Marshaller marsh,
- bool keepBinary) : base(target, marsh, keepBinary)
+ public QueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target, keepBinary)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
index 30afe57..678fb03 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
@@ -37,16 +37,14 @@ namespace Apache.Ignite.Core.Impl.Cluster
using Apache.Ignite.Core.Impl.Messaging;
using Apache.Ignite.Core.Impl.PersistentStore;
using Apache.Ignite.Core.Impl.Services;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
using Apache.Ignite.Core.PersistentStore;
using Apache.Ignite.Core.Services;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Ignite projection implementation.
/// </summary>
- internal class ClusterGroupImpl : PlatformTarget, IClusterGroup
+ internal class ClusterGroupImpl : PlatformTargetAdapter, IClusterGroup
{
/** Attribute: platform. */
private const string AttrPlatform = "org.apache.ignite.platform";
@@ -175,13 +173,12 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="ignite">Grid.</param>
/// <param name="pred">Predicate.</param>
[SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")]
- public ClusterGroupImpl(IUnmanagedTarget target, Ignite ignite, Func<IClusterNode, bool> pred)
- : base(target, ignite.Marshaller)
+ public ClusterGroupImpl(IPlatformTargetInternal target, Func<IClusterNode, bool> pred)
+ : base(target)
{
- _ignite = ignite;
+ _ignite = target.Marshaller.Ignite;
_pred = pred;
_comp = new Lazy<ICompute>(() => CreateCompute());
@@ -207,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private ICompute CreateCompute()
{
- return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), Marshaller, this, false));
+ return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), this, false));
}
/** <inheritDoc /> */
@@ -252,10 +249,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
Debug.Assert(items != null);
- IUnmanagedTarget prj = DoOutOpObject(OpForNodeIds, writer =>
- {
- WriteEnumerable(writer, items, func);
- });
+ var prj = DoOutOpObject(OpForNodeIds, writer => writer.WriteEnumerable(items, func));
return GetClusterGroup(prj);
}
@@ -265,7 +259,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
var newPred = _pred == null ? p : node => _pred(node) && p(node);
- return new ClusterGroupImpl(Target, _ignite, newPred);
+ return new ClusterGroupImpl(Target, newPred);
}
/** <inheritDoc /> */
@@ -278,7 +272,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
writer.WriteString(name);
writer.WriteString(val);
};
- IUnmanagedTarget prj = DoOutOpObject(OpForAttribute, action);
+ var prj = DoOutOpObject(OpForAttribute, action);
return GetClusterGroup(prj);
}
@@ -293,7 +287,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </returns>
private IClusterGroup ForCacheNodes(string name, int op)
{
- IUnmanagedTarget prj = DoOutOpObject(op, writer =>
+ var prj = DoOutOpObject(op, writer =>
{
writer.WriteString(name);
});
@@ -336,7 +330,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
IgniteArgumentCheck.NotNull(node, "node");
- IUnmanagedTarget prj = DoOutOpObject(OpForHost, writer =>
+ var prj = DoOutOpObject(OpForHost, writer =>
{
writer.WriteGuid(node.Id);
});
@@ -404,15 +398,14 @@ namespace Apache.Ignite.Core.Impl.Cluster
return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
});
}
- return DoOutInOp(OpMetricsFiltered, writer =>
- {
- WriteEnumerable(writer, GetNodes().Select(node => node.Id));
- }, stream =>
- {
- IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false);
+ return DoOutInOp(OpMetricsFiltered,
+ writer => writer.WriteEnumerable(GetNodes().Select(node => node.Id)),
+ stream =>
+ {
+ IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false);
- return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
- });
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
}
/** <inheritDoc /> */
@@ -426,7 +419,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IMessaging CreateMessaging()
{
- return new Messaging(DoOutOpObject(OpGetMessaging), Marshaller, this);
+ return new Messaging(DoOutOpObject(OpGetMessaging), this);
}
/** <inheritDoc /> */
@@ -440,7 +433,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IEvents CreateEvents()
{
- return new Events(DoOutOpObject(OpGetEvents), Marshaller, this);
+ return new Events(DoOutOpObject(OpGetEvents), this);
}
/** <inheritDoc /> */
@@ -454,7 +447,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IServices CreateServices()
{
- return new Services(DoOutOpObject(OpGetServices), Marshaller, this, false, false);
+ return new Services(DoOutOpObject(OpGetServices), this, false, false);
}
/// <summary>
@@ -665,9 +658,9 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
/// <param name="prj">Native projection.</param>
/// <returns>New cluster group.</returns>
- private IClusterGroup GetClusterGroup(IUnmanagedTarget prj)
+ private IClusterGroup GetClusterGroup(IPlatformTargetInternal prj)
{
- return new ClusterGroupImpl(prj, _ignite, _pred);
+ return new ClusterGroupImpl(prj, _pred);
}
/// <summary>
@@ -678,29 +671,30 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
long oldTopVer = Interlocked.Read(ref _topVer);
- List<IClusterNode> newNodes = null;
-
- DoOutInOp(OpNodes, writer =>
+ var res = Target.InStreamOutStream(OpNodes, writer =>
{
writer.WriteLong(oldTopVer);
- }, input =>
+ }, reader =>
{
- BinaryReader reader = Marshaller.StartUnmarshal(input);
-
if (reader.ReadBoolean())
{
// Topology has been updated.
long newTopVer = reader.ReadLong();
+ var newNodes = IgniteUtils.ReadNodes((BinaryReader) reader, _pred);
- newNodes = IgniteUtils.ReadNodes(reader, _pred);
-
- UpdateTopology(newTopVer, newNodes);
+ return Tuple.Create(newTopVer, newNodes);
}
+
+ return null;
});
- if (newNodes != null)
- return newNodes;
-
+ if (res != null)
+ {
+ UpdateTopology(res.Item1, res.Item2);
+
+ return res.Item2;
+ }
+
// No topology changes.
Debug.Assert(_nodes != null, "At least one topology update should have occurred.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 4cd0678..cc12caa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -28,7 +28,6 @@ namespace Apache.Ignite.Core.Impl.Common
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
using Apache.Ignite.Core.Impl.Datastream;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
/// <summary>
@@ -66,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Common
private readonly Action<object> _computeJobCancel;
/** */
- private readonly Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> _streamReceiver;
+ private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _streamReceiver;
/** */
private readonly Func<object, object> _streamTransformerCtor;
@@ -163,7 +162,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
- public static Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> GetStreamReceiver(Type type)
+ public static Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> GetStreamReceiver(Type type)
{
return Get(type)._streamReceiver;
}
@@ -313,12 +312,12 @@ namespace Apache.Ignite.Core.Impl.Common
.MakeGenericMethod(iface.GetGenericArguments());
_streamReceiver = DelegateConverter
- .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool>>(
+ .CompileFunc<Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool>>(
typeof (StreamReceiverHolder),
method,
new[]
{
- iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IBinaryStream),
+ iface, typeof (Ignite), typeof (IPlatformTargetInternal), typeof (IBinaryStream),
typeof (bool)
},
new[] {true, false, false, false, false, false});
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
index 6da98ab..8566d0b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
@@ -17,13 +17,10 @@
namespace Apache.Ignite.Core.Impl.Common
{
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
-
/// <summary>
/// Platform listenable.
/// </summary>
- internal class Listenable : PlatformTarget
+ internal class Listenable : PlatformTargetAdapter
{
/** */
private const int OpCancel = 1;
@@ -32,8 +29,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// Initializes a new instance of the <see cref="Listenable"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public Listenable(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index cace7b2..06f9ad4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -34,13 +34,12 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Compute.Closure;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Compute implementation.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class ComputeImpl : PlatformTarget
+ internal class ComputeImpl : PlatformTargetAdapter
{
/** */
private const int OpAffinity = 1;
@@ -76,11 +75,10 @@ namespace Apache.Ignite.Core.Impl.Compute
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="prj">Projection.</param>
/// <param name="keepBinary">Binary flag.</param>
- public ComputeImpl(IUnmanagedTarget target, Marshaller marsh, ClusterGroupImpl prj, bool keepBinary)
- : base(target, marsh)
+ public ComputeImpl(IPlatformTargetInternal target, ClusterGroupImpl prj, bool keepBinary)
+ : base(target)
{
_prj = prj;
@@ -194,7 +192,7 @@ namespace Apache.Ignite.Core.Impl.Compute
var future = holder.Future;
- future.SetTarget(new Listenable(futTarget, Marshaller));
+ future.SetTarget(new Listenable(futTarget));
return future;
}
@@ -551,7 +549,7 @@ namespace Apache.Ignite.Core.Impl.Compute
writeAction(writer);
});
- holder.Future.SetTarget(new Listenable(futTarget, Marshaller));
+ holder.Future.SetTarget(new Listenable(futTarget));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
index 0c4bf84..f797408 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
@@ -19,16 +19,12 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
-
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Atomic long wrapper.
/// </summary>
- internal sealed class AtomicLong : PlatformTarget, IAtomicLong
+ internal sealed class AtomicLong : PlatformTargetAdapter, IAtomicLong
{
/** */
private readonly string _name;
@@ -50,9 +46,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures
/// Initializes a new instance of the <see cref="AtomicLong"/> class.
/// </summary>
/// <param name="target">The target.</param>
- /// <param name="marsh">The marshaller.</param>
/// <param name="name">The name.</param>
- public AtomicLong(IUnmanagedTarget target, Marshaller marsh, string name) : base(target, marsh)
+ public AtomicLong(IPlatformTargetInternal target, string name) : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
index 4ca4b24..76515a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
@@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Atomic reference.
/// </summary>
- internal class AtomicReference<T> : PlatformTarget, IAtomicReference<T>
+ internal class AtomicReference<T> : PlatformTargetAdapter, IAtomicReference<T>
{
/** Opcodes. */
private enum Op
@@ -41,8 +39,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures
private readonly string _name;
/** <inheritDoc /> */
- public AtomicReference(IUnmanagedTarget target, Marshaller marsh, string name)
- : base(target, marsh)
+ public AtomicReference(IPlatformTargetInternal target, string name)
+ : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
index f7fc6b7..dd079ef 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
@@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Atomic long wrapper.
/// </summary>
- internal sealed class AtomicSequence: PlatformTarget, IAtomicSequence
+ internal sealed class AtomicSequence: PlatformTargetAdapter, IAtomicSequence
{
/** */
private readonly string _name;
@@ -46,10 +44,9 @@ namespace Apache.Ignite.Core.Impl.DataStructures
/// Initializes a new instance of the <see cref="Apache.Ignite.Core.Impl.DataStructures.AtomicLong"/> class.
/// </summary>
/// <param name="target">The target.</param>
- /// <param name="marsh">The marshaller.</param>
/// <param name="name">The name.</param>
- public AtomicSequence(IUnmanagedTarget target, Marshaller marsh, string name)
- : base(target, marsh)
+ public AtomicSequence(IPlatformTargetInternal target, string name)
+ : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 96e58d4..fb2df01 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -26,8 +26,6 @@ namespace Apache.Ignite.Core.Impl.Datastream
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Data streamer internal interface to get rid of generics.
@@ -45,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <summary>
/// Data streamer implementation.
/// </summary>
- internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV>
+ internal class DataStreamerImpl<TK, TV> : PlatformDisposableTargetAdapter, IDataStreamer, IDataStreamer<TK, TV>
{
#pragma warning disable 0420
@@ -141,8 +139,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <param name="marsh">Marshaller.</param>
/// <param name="cacheName">Cache name.</param>
/// <param name="keepBinary">Binary flag.</param>
- public DataStreamerImpl(IUnmanagedTarget target, Marshaller marsh, string cacheName, bool keepBinary)
- : base(target, marsh)
+ public DataStreamerImpl(IPlatformTargetInternal target, Marshaller marsh, string cacheName, bool keepBinary)
+ : base(target)
{
_cacheName = cacheName;
_keepBinary = keepBinary;