You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/28 07:32:52 UTC
[1/8] ignite git commit: Added docker file for 2.1 version.
Repository: ignite
Updated Branches:
refs/heads/ignite-5578 cf8035199 -> 68e4e4510
Added docker file for 2.1 version.
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a6af695
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a6af695
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a6af695
Branch: refs/heads/ignite-5578
Commit: 7a6af69563bfdb9dd7043c94c82a60b8e260d595
Parents: e9a0d69
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Jul 27 18:26:29 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Jul 27 18:26:29 2017 +0300
----------------------------------------------------------------------
modules/docker/2.1.0/Dockerfile | 46 ++++++++++++++++++++++++++++++++
modules/docker/2.1.0/run.sh | 51 ++++++++++++++++++++++++++++++++++++
modules/docker/Dockerfile | 2 +-
3 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a6af695/modules/docker/2.1.0/Dockerfile
----------------------------------------------------------------------
diff --git a/modules/docker/2.1.0/Dockerfile b/modules/docker/2.1.0/Dockerfile
new file mode 100644
index 0000000..6a0eecd
--- /dev/null
+++ b/modules/docker/2.1.0/Dockerfile
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Start from a Java image.
+FROM java:8
+
+# Ignite version
+ENV IGNITE_VERSION 2.1.0
+
+# Ignite home
+ENV IGNITE_HOME /opt/ignite/apache-ignite-fabric-${IGNITE_VERSION}-bin
+
+# Do not rely on anything provided by base image(s), but be explicit, if they are installed already it is noop then
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ unzip \
+ curl \
+ && rm -rf /var/lib/apt/lists/*
+
+WORKDIR /opt/ignite
+
+RUN curl https://dist.apache.org/repos/dist/release/ignite/${IGNITE_VERSION}/apache-ignite-fabric-${IGNITE_VERSION}-bin.zip -o ignite.zip \
+ && unzip ignite.zip \
+ && rm ignite.zip
+
+# Copy sh files and set permission
+COPY ./run.sh $IGNITE_HOME/
+
+RUN chmod +x $IGNITE_HOME/run.sh
+
+CMD $IGNITE_HOME/run.sh
+
+EXPOSE 11211 47100 47500 49112
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a6af695/modules/docker/2.1.0/run.sh
----------------------------------------------------------------------
diff --git a/modules/docker/2.1.0/run.sh b/modules/docker/2.1.0/run.sh
new file mode 100644
index 0000000..3aafc30
--- /dev/null
+++ b/modules/docker/2.1.0/run.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if [ ! -z "$OPTION_LIBS" ]; then
+ IFS=, LIBS_LIST=("$OPTION_LIBS")
+
+ for lib in ${LIBS_LIST[@]}; do
+ cp -r $IGNITE_HOME/libs/optional/"$lib"/* \
+ $IGNITE_HOME/libs/
+ done
+fi
+
+if [ ! -z "$EXTERNAL_LIBS" ]; then
+ IFS=, LIBS_LIST=("$EXTERNAL_LIBS")
+
+ for lib in ${LIBS_LIST[@]}; do
+ echo $lib >> temp
+ done
+
+ wget -i temp -P $IGNITE_HOME/libs
+
+ rm temp
+fi
+
+QUIET=""
+
+if [ "$IGNITE_QUIET" = "false" ]; then
+ QUIET="-v"
+fi
+
+if [ -z $CONFIG_URI ]; then
+ $IGNITE_HOME/bin/ignite.sh $QUIET
+else
+ $IGNITE_HOME/bin/ignite.sh $QUIET $CONFIG_URI
+fi
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a6af695/modules/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/modules/docker/Dockerfile b/modules/docker/Dockerfile
index 2292981..6a0eecd 100644
--- a/modules/docker/Dockerfile
+++ b/modules/docker/Dockerfile
@@ -19,7 +19,7 @@
FROM java:8
# Ignite version
-ENV IGNITE_VERSION 2.0.0
+ENV IGNITE_VERSION 2.1.0
# Ignite home
ENV IGNITE_HOME /opt/ignite/apache-ignite-fabric-${IGNITE_VERSION}-bin
[3/8] ignite git commit: IGNITE-5771: Added Ignite::SetActive() for
C++
Posted by sb...@apache.org.
IGNITE-5771: Added Ignite::SetActive() for C++
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47fea40b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47fea40b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47fea40b
Branch: refs/heads/ignite-5578
Commit: 47fea40b07857a08727ac2e062cb9a3e0f464fdc
Parents: 2941392
Author: Igor Sapego <is...@gridgain.com>
Authored: Thu Jul 27 19:39:51 2017 +0300
Committer: Igor Sapego <is...@gridgain.com>
Committed: Thu Jul 27 19:39:51 2017 +0300
----------------------------------------------------------------------
.../cpp/core-test/src/cluster_test.cpp | 13 ++++++
.../platforms/cpp/core/include/ignite/ignite.h | 15 +++++++
.../ignite/impl/cluster/cluster_group_impl.h | 15 +++++++
.../cpp/core/include/ignite/impl/ignite_impl.h | 46 ++++++++++----------
modules/platforms/cpp/core/src/ignite.cpp | 10 +++++
.../src/impl/cluster/cluster_group_impl.cpp | 26 ++++++++++-
.../platforms/cpp/core/src/impl/ignite_impl.cpp | 24 ++++++++++
7 files changed, 124 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core-test/src/cluster_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cluster_test.cpp b/modules/platforms/cpp/core-test/src/cluster_test.cpp
index e9d6728..4ee3f39 100644
--- a/modules/platforms/cpp/core-test/src/cluster_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cluster_test.cpp
@@ -83,4 +83,17 @@ BOOST_AUTO_TEST_CASE(IgniteImplForServers)
BOOST_REQUIRE(clusterGroup.Get()->ForServers().IsValid());
}
+BOOST_AUTO_TEST_CASE(IgniteSetActive)
+{
+ BOOST_REQUIRE(node.IsActive());
+
+ node.SetActive(false);
+
+ BOOST_REQUIRE(!node.IsActive());
+
+ node.SetActive(true);
+
+ BOOST_REQUIRE(node.IsActive());
+}
+
BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/include/ignite/ignite.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite.h b/modules/platforms/cpp/core/include/ignite/ignite.h
index 07134a1..e953b8f 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite.h
@@ -182,6 +182,21 @@ namespace ignite
}
/**
+ * Check if the Ignite grid is active.
+ *
+ * @return True if grid is active and false otherwise.
+ */
+ bool IsActive();
+
+ /**
+ * Change Ignite grid state to active or inactive.
+ *
+ * @param active If true start activation process. If false start
+ * deactivation process.
+ */
+ void SetActive(bool active);
+
+ /**
* Get transactions.
*
* This method should only be used on the valid instance.
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h
index 3cfd700..d81e899 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h
@@ -71,6 +71,21 @@ namespace ignite
*/
SP_ComputeImpl GetCompute();
+ /**
+ * Check if the Ignite grid is active.
+ *
+ * @return True if grid is active and false otherwise.
+ */
+ bool IsActive();
+
+ /**
+ * Change Ignite grid state to active or inactive.
+ *
+ * @param active If true start activation process. If false start
+ * deactivation process.
+ */
+ void SetActive(bool active);
+
private:
IGNITE_NO_COPY_ASSIGNMENT(ClusterGroupImpl);
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
index d1763c4..5461d1c 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
@@ -182,6 +182,27 @@ namespace ignite
*/
SP_ComputeImpl GetCompute();
+ /**
+ * Check if the Ignite grid is active.
+ *
+ * @return True if grid is active and false otherwise.
+ */
+ bool IsActive()
+ {
+ return prjImpl.Get()->IsActive();
+ }
+
+ /**
+ * Change Ignite grid state to active or inactive.
+ *
+ * @param active If true start activation process. If false start
+ * deactivation process.
+ */
+ void SetActive(bool active)
+ {
+ prjImpl.Get()->SetActive(active);
+ }
+
private:
/**
* Get transactions internal call.
@@ -215,30 +236,7 @@ namespace ignite
* @param err Error.
* @param op Operation code.
*/
- cache::CacheImpl* GetOrCreateCache(const char* name, IgniteError& err, int32_t op)
- {
- SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory();
- InteropMemory* mem0 = mem.Get();
- InteropOutputStream out(mem0);
- BinaryWriterImpl writer(&out, env.Get()->GetTypeManager());
- BinaryRawWriter rawWriter(&writer);
-
- rawWriter.WriteString(name);
-
- out.Synchronize();
-
- jobject cacheJavaRef = InStreamOutObject(op, *mem0, err);
-
- if (!cacheJavaRef)
- {
- return NULL;
- }
-
- char* name0 = common::CopyChars(name);
-
- return new cache::CacheImpl(name0, env, cacheJavaRef);
- }
-
+ cache::CacheImpl* GetOrCreateCache(const char* name, IgniteError& err, int32_t op);
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/src/ignite.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/ignite.cpp b/modules/platforms/cpp/core/src/ignite.cpp
index 9c42f1d..6eaae01 100644
--- a/modules/platforms/cpp/core/src/ignite.cpp
+++ b/modules/platforms/cpp/core/src/ignite.cpp
@@ -45,6 +45,16 @@ namespace ignite
return impl.Get()->GetConfiguration();
}
+ bool Ignite::IsActive()
+ {
+ return impl.Get()->IsActive();
+ }
+
+ void Ignite::SetActive(bool active)
+ {
+ impl.Get()->SetActive(active);
+ }
+
transactions::Transactions Ignite::GetTransactions()
{
using ignite::common::concurrent::SharedPointer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
index c34e828..91f9d30 100644
--- a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
@@ -30,7 +30,11 @@ namespace ignite
{
enum Type
{
- FOR_SERVERS = 23
+ FOR_SERVERS = 23,
+
+ SET_ACTIVE = 28,
+
+ IS_ACTIVE = 29
};
};
@@ -61,6 +65,26 @@ namespace ignite
return computeImpl;
}
+ bool ClusterGroupImpl::IsActive()
+ {
+ IgniteError err;
+
+ int64_t res = OutInOpLong(Command::IS_ACTIVE, 0, err);
+
+ IgniteError::ThrowIfNeeded(err);
+
+ return res == 1;
+ }
+
+ void ClusterGroupImpl::SetActive(bool active)
+ {
+ IgniteError err;
+
+ int64_t res = OutInOpLong(Command::SET_ACTIVE, active ? 1 : 0, err);
+
+ IgniteError::ThrowIfNeeded(err);
+ }
+
SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef)
{
return SP_ClusterGroupImpl(new ClusterGroupImpl(GetEnvironmentPointer(), javaRef));
http://git-wip-us.apache.org/repos/asf/ignite/blob/47fea40b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index f7ff185..f2132d4 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -91,5 +91,29 @@ namespace ignite
return res;
}
+
+ cache::CacheImpl* IgniteImpl::GetOrCreateCache(const char* name, IgniteError& err, int32_t op)
+ {
+ SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory();
+ InteropMemory* mem0 = mem.Get();
+ InteropOutputStream out(mem0);
+ BinaryWriterImpl writer(&out, env.Get()->GetTypeManager());
+ BinaryRawWriter rawWriter(&writer);
+
+ rawWriter.WriteString(name);
+
+ out.Synchronize();
+
+ jobject cacheJavaRef = InStreamOutObject(op, *mem0, err);
+
+ if (!cacheJavaRef)
+ {
+ return NULL;
+ }
+
+ char* name0 = common::CopyChars(name);
+
+ return new cache::CacheImpl(name0, env, cacheJavaRef);
+ }
}
}
[8/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68e4e451
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68e4e451
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68e4e451
Branch: refs/heads/ignite-5578
Commit: 68e4e451097bf17fea0d5ae16b5b66c152e6ab7e
Parents: cf80351 b698bbf
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 28 10:32:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 28 10:32:36 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 14 +
.../GridDhtPartitionsExchangeFuture.java | 18 +-
.../cache/query/GridCacheQueryManager.java | 24 +-
.../platform/PlatformTargetProxy.java | 11 +
.../platform/PlatformTargetProxyImpl.java | 79 +-
.../IgniteCacheBinaryObjectsScanSelfTest.java | 9 +-
...acheBinaryObjectsScanWithEventsSelfTest.java | 30 +
.../plugin/PlatformTestPluginTarget.java | 7 +-
modules/docker/2.1.0/Dockerfile | 46 +
modules/docker/2.1.0/run.sh | 51 +
modules/docker/Dockerfile | 2 +-
.../IgniteBinaryCacheQueryTestSuite.java | 2 +
.../cpp/core-test/src/cluster_test.cpp | 13 +
.../platforms/cpp/core/include/ignite/ignite.h | 15 +
.../ignite/impl/cluster/cluster_group_impl.h | 15 +
.../cpp/core/include/ignite/impl/ignite_impl.h | 46 +-
modules/platforms/cpp/core/src/ignite.cpp | 10 +
.../src/impl/cluster/cluster_group_impl.cpp | 26 +-
.../platforms/cpp/core/src/impl/ignite_impl.cpp | 24 +
.../cpp/jni/include/ignite/jni/exports.h | 1 +
.../platforms/cpp/jni/include/ignite/jni/java.h | 5 +-
modules/platforms/cpp/jni/project/vs/module.def | 1 +
modules/platforms/cpp/jni/src/exports.cpp | 4 +
modules/platforms/cpp/jni/src/java.cpp | 19 +-
.../Plugin/PluginTest.cs | 13 +-
.../Apache.Ignite.Core.Tests/TestUtils.cs | 7 +-
.../Apache.Ignite.Core.csproj | 5 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 9 +-
.../Impl/Binary/BinaryProcessor.cs | 6 +-
.../Impl/Binary/BinaryWriterExtensions.cs | 107 ++
.../Cache/Affinity/PlatformAffinityFunction.cs | 7 +-
.../Impl/Cache/CacheAffinityImpl.cs | 18 +-
.../Impl/Cache/CacheEnumerator.cs | 8 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 113 +-
.../Impl/Cache/Query/AbstractQueryCursor.cs | 12 +-
.../Continuous/ContinuousQueryHandleImpl.cs | 12 +-
.../Impl/Cache/Query/FieldsQueryCursor.cs | 6 +-
.../Impl/Cache/Query/QueryCursor.cs | 5 +-
.../Impl/Cluster/ClusterGroupImpl.cs | 76 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 9 +-
.../Impl/Common/Listenable.cs | 8 +-
.../Impl/Compute/ComputeImpl.cs | 12 +-
.../Impl/DataStructures/AtomicLong.cs | 9 +-
.../Impl/DataStructures/AtomicReference.cs | 8 +-
.../Impl/DataStructures/AtomicSequence.cs | 9 +-
.../Impl/Datastream/DataStreamerImpl.cs | 8 +-
.../Impl/Datastream/StreamReceiverHolder.cs | 13 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 11 +-
.../Impl/IPlatformTargetInternal.cs | 102 ++
.../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 48 +-
.../Impl/Messaging/Messaging.cs | 10 +-
.../Impl/PlatformDisposableTargetAdapter.cs | 75 ++
.../Impl/PlatformJniTarget.cs | 536 +++++++++
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 1086 ------------------
.../Impl/PlatformTargetAdapter.cs | 534 +++++++++
.../Impl/Services/Services.cs | 19 +-
.../Impl/Transactions/TransactionsImpl.cs | 29 +-
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 10 +-
.../Impl/Unmanaged/UnmanagedUtils.cs | 7 +
.../Interop/IPlatformTarget.cs | 15 +
61 files changed, 1994 insertions(+), 1443 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/68e4e451/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ebd305a,52a74ab..e2e62ba
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -563,41 -490,18 +563,41 @@@ public class GridDhtPartitionsExchangeF
initCachesOnLocalJoin();
}
+ if (newCrd) {
- IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this);
++ IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false);
+
+ if (fut != null)
+ fut.get();
++
++ cctx.exchange().coordinatorInitialized();
+ }
+
- exchange = CU.clientNode(discoEvt.eventNode()) ?
- onClientNodeEvent(crdNode) :
- onServerNodeEvent(crdNode);
+ if (exchCtx.mergeExchanges()) {
+ if (localJoinExchange()) {
+ if (cctx.kernalContext().clientNode()) {
+ onClientNodeEvent(crdNode);
+
+ exchange = ExchangeType.CLIENT;
+ }
+ else {
+ onServerNodeEvent(crdNode);
+
+ exchange = ExchangeType.ALL;
+ }
+ }
+ else {
+ if (CU.clientNode(discoEvt.eventNode()))
+ exchange = onClientNodeEvent(crdNode);
+ else
+ exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
+ }
+ }
+ else {
+ exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) :
+ onServerNodeEvent(crdNode);
+ }
}
- if (newCrd) {
- IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false);
-
- if (fut != null)
- fut.get();
-
- cctx.exchange().coordinatorInitialized();
- }
-
updateTopologies(crdNode);
switch (exchange) {
[5/8] ignite git commit: IGNITE-5769 Abstract away .NET->Java calls
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
index c91334d..b717d14 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/StreamReceiverHolder.cs
@@ -27,7 +27,6 @@ namespace Apache.Ignite.Core.Impl.Datastream
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Binary wrapper for <see cref="IStreamReceiver{TK,TV}"/>.
@@ -44,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
private readonly object _rcv;
/** Invoker delegate. */
- private readonly Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> _invoke;
+ private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _invoke;
/// <summary>
/// Initializes a new instance of the <see cref="StreamReceiverHolder"/> class.
@@ -77,7 +76,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <param name="rcv">Receiver.</param>
/// <param name="invoke">Invoke delegate.</param>
public StreamReceiverHolder(object rcv,
- Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> invoke)
+ Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> invoke)
{
Debug.Assert(rcv != null);
Debug.Assert(invoke != null);
@@ -109,7 +108,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <param name="cache">Cache.</param>
/// <param name="stream">Stream.</param>
/// <param name="keepBinary">Binary flag.</param>
- public void Receive(Ignite grid, IUnmanagedTarget cache, IBinaryStream stream, bool keepBinary)
+ public void Receive(Ignite grid, IPlatformTargetInternal cache, IBinaryStream stream, bool keepBinary)
{
Debug.Assert(grid != null);
Debug.Assert(cache != null);
@@ -126,8 +125,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <param name="cache">Cache.</param>
/// <param name="stream">Stream.</param>
/// <param name="keepBinary">Binary flag.</param>
- public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid, IUnmanagedTarget cache,
- IBinaryStream stream, bool keepBinary)
+ public static void InvokeReceiver<TK, TV>(IStreamReceiver<TK, TV> receiver, Ignite grid,
+ IPlatformTargetInternal cache, IBinaryStream stream, bool keepBinary)
{
var reader = grid.Marshaller.StartUnmarshal(stream, keepBinary);
@@ -138,7 +137,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
for (var i = 0; i < size; i++)
entries.Add(new CacheEntry<TK, TV>(reader.ReadObject<TK>(), reader.ReadObject<TV>()));
- receiver.Receive(grid.GetCache<TK, TV>(cache, keepBinary), entries);
+ receiver.Receive(Ignite.GetCache<TK, TV>(cache, keepBinary), entries);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
index eb454d6..3c7363e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Events/Events.cs
@@ -30,13 +30,11 @@ namespace Apache.Ignite.Core.Impl.Events
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Handle;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Ignite events.
/// </summary>
- internal sealed class Events : PlatformTarget, IEvents
+ internal sealed class Events : PlatformTargetAdapter, IEvents
{
/// <summary>
/// Opcodes.
@@ -66,15 +64,14 @@ namespace Apache.Ignite.Core.Impl.Events
/** Cluster group. */
private readonly IClusterGroup _clusterGroup;
-
+
/// <summary>
/// Initializes a new instance of the <see cref="Events" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="clusterGroup">Cluster group.</param>
- public Events(IUnmanagedTarget target, Marshaller marsh, IClusterGroup clusterGroup)
- : base(target, marsh)
+ public Events(IPlatformTargetInternal target, IClusterGroup clusterGroup)
+ : base(target)
{
Debug.Assert(clusterGroup != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs
new file mode 100644
index 0000000..23174b4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IPlatformTargetInternal.cs
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using System;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Interop;
+
+ /// <summary>
+ /// Extended platform target interface with methods that operate on internal entities (streams and targets).
+ /// </summary>
+ internal interface IPlatformTargetInternal : IPlatformTarget, IDisposable
+ {
+ /// <summary>
+ /// Gets the marshaller.
+ /// </summary>
+ Marshaller Marshaller { get; }
+
+ /// <summary>
+ /// Performs InStreamOutLong operation.
+ /// </summary>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action.</param>
+ /// <returns>Result.</returns>
+ long InStreamOutLong(int type, Action<IBinaryStream> writeAction);
+
+ /// <summary>
+ /// Performs InStreamOutLong operation with stream reuse.
+ /// </summary>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action.</param>
+ /// <param name="readAction">Read action.</param>
+ /// <param name="readErrorAction">Error action.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ T InStreamOutLong<T>(int type, Action<IBinaryStream> writeAction, Func<IBinaryStream, long, T> readAction,
+ Func<IBinaryStream, Exception> readErrorAction);
+
+ /// <summary>
+ /// Performs InStreamOutStream operation.
+ /// </summary>
+ /// <typeparam name="T">Result type.</typeparam>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action.</param>
+ /// <param name="readAction">Read action.</param>
+ /// <returns>Result.</returns>
+ T InStreamOutStream<T>(int type, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readAction);
+
+ /// <summary>
+ /// Performs InStreamOutObject operation.
+ /// </summary>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action.</param>
+ /// <returns>Result.</returns>
+ IPlatformTargetInternal InStreamOutObject(int type, Action<IBinaryStream> writeAction);
+
+ /// <summary>
+ /// Performs InObjectStreamOutObjectStream operation.
+ /// </summary>
+ /// <typeparam name="T">Result type.</typeparam>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="arg">Target argument.</param>
+ /// <param name="writeAction">Write action.</param>
+ /// <param name="readAction">Read action.</param>
+ /// <returns>Result.</returns>
+ T InObjectStreamOutObjectStream<T>(int type, Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, IPlatformTargetInternal, T> readAction, IPlatformTargetInternal arg);
+
+ /// <summary>
+ /// Performs OutStream operation.
+ /// </summary>
+ /// <typeparam name="T">Result type.</typeparam>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="readAction">Read action.</param>
+ /// <returns>Result.</returns>
+ T OutStream<T>(int type, Func<IBinaryStream, T> readAction);
+
+ /// <summary>
+ /// Performs the OutObject operation.
+ /// </summary>
+ /// <param name="type">Operation type code.</param>
+ /// <returns>Result.</returns>
+ IPlatformTargetInternal OutObjectInternal(int type);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index 715776e..aae6ce7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -28,6 +28,7 @@ namespace Apache.Ignite.Core.Impl
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Compute;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.DataStructures;
@@ -54,7 +55,7 @@ namespace Apache.Ignite.Core.Impl
/// <summary>
/// Native Ignite wrapper.
/// </summary>
- internal class Ignite : PlatformTarget, IIgnite, ICluster
+ internal class Ignite : PlatformTargetAdapter, IIgnite, ICluster
{
/// <summary>
/// Operation codes for PlatformProcessorImpl calls.
@@ -92,7 +93,7 @@ namespace Apache.Ignite.Core.Impl
private readonly string _name;
/** Unmanaged node. */
- private readonly IUnmanagedTarget _proc;
+ private readonly IPlatformTargetInternal _proc;
/** Marshaller. */
private readonly Marshaller _marsh;
@@ -138,8 +139,8 @@ namespace Apache.Ignite.Core.Impl
/// <param name="marsh">Marshaller.</param>
/// <param name="lifecycleHandlers">Lifecycle beans.</param>
/// <param name="cbs">Callbacks.</param>
- public Ignite(IgniteConfiguration cfg, string name, IUnmanagedTarget proc, Marshaller marsh,
- IList<LifecycleHandlerHolder> lifecycleHandlers, UnmanagedCallbacks cbs) : base(proc, marsh)
+ public Ignite(IgniteConfiguration cfg, string name, IPlatformTargetInternal proc, Marshaller marsh,
+ IList<LifecycleHandlerHolder> lifecycleHandlers, UnmanagedCallbacks cbs) : base(proc)
{
Debug.Assert(cfg != null);
Debug.Assert(proc != null);
@@ -156,17 +157,17 @@ namespace Apache.Ignite.Core.Impl
marsh.Ignite = this;
- _prj = new ClusterGroupImpl(DoOutOpObject((int) Op.GetClusterGroup), this, null);
+ _prj = new ClusterGroupImpl(Target.OutObjectInternal((int) Op.GetClusterGroup), null);
_binary = new Binary.Binary(marsh);
- _binaryProc = new BinaryProcessor(DoOutOpObject((int) Op.GetBinaryProcessor), marsh);
+ _binaryProc = new BinaryProcessor(DoOutOpObject((int) Op.GetBinaryProcessor));
cbs.Initialize(this);
// Grid is not completely started here, can't initialize interop transactions right away.
_transactions = new Lazy<TransactionsImpl>(
- () => new TransactionsImpl(DoOutOpObject((int) Op.GetTransactions), marsh, GetLocalNode().Id));
+ () => new TransactionsImpl(DoOutOpObject((int) Op.GetTransactions), GetLocalNode().Id));
// Set reconnected task to completed state for convenience.
_clientReconnectTaskCompletionSource.SetResult(false);
@@ -380,7 +381,14 @@ namespace Apache.Ignite.Core.Impl
/// <param name="cancel">Cancel flag.</param>
internal unsafe void Stop(bool cancel)
{
- UU.IgnitionStop(_proc.Context, Name, cancel);
+ var jniTarget = _proc as PlatformJniTarget;
+
+ if (jniTarget == null)
+ {
+ throw new IgniteException("Ignition.Stop is not supported in thin client.");
+ }
+
+ UU.IgnitionStop(jniTarget.Target.Context, Name, cancel);
_cbs.Cleanup();
}
@@ -507,9 +515,9 @@ namespace Apache.Ignite.Core.Impl
/// <returns>
/// New instance of cache wrapping specified native cache.
/// </returns>
- public ICache<TK, TV> GetCache<TK, TV>(IUnmanagedTarget nativeCache, bool keepBinary = false)
+ public static ICache<TK, TV> GetCache<TK, TV>(IPlatformTargetInternal nativeCache, bool keepBinary = false)
{
- return new CacheImpl<TK, TV>(this, nativeCache, _marsh, false, keepBinary, false, false);
+ return new CacheImpl<TK, TV>(nativeCache, false, keepBinary, false, false);
}
/** <inheritdoc /> */
@@ -585,7 +593,7 @@ namespace Apache.Ignite.Core.Impl
var aff = DoOutOpObject((int) Op.GetAffinity, w => w.WriteString(cacheName));
- return new CacheAffinityImpl(aff, _marsh, false, this);
+ return new CacheAffinityImpl(aff, false);
}
/** <inheritdoc /> */
@@ -627,7 +635,7 @@ namespace Apache.Ignite.Core.Impl
if (nativeLong == null)
return null;
- return new AtomicLong(nativeLong, Marshaller, name);
+ return new AtomicLong(nativeLong, name);
}
/** <inheritdoc /> */
@@ -645,7 +653,7 @@ namespace Apache.Ignite.Core.Impl
if (nativeSeq == null)
return null;
- return new AtomicSequence(nativeSeq, Marshaller, name);
+ return new AtomicSequence(nativeSeq, name);
}
/** <inheritdoc /> */
@@ -660,7 +668,7 @@ namespace Apache.Ignite.Core.Impl
w.WriteBoolean(create);
});
- return refTarget == null ? null : new AtomicReference<T>(refTarget, Marshaller, name);
+ return refTarget == null ? null : new AtomicReference<T>(refTarget, name);
}
/** <inheritdoc /> */
@@ -685,7 +693,7 @@ namespace Apache.Ignite.Core.Impl
/** <inheritdoc /> */
public ICollection<string> GetCacheNames()
{
- return OutStream((int) Op.GetCacheNames, r =>
+ return Target.OutStream((int) Op.GetCacheNames, r =>
{
var res = new string[r.ReadInt()];
@@ -848,7 +856,7 @@ namespace Apache.Ignite.Core.Impl
/// <summary>
/// Gets the interop processor.
/// </summary>
- internal IUnmanagedTarget InteropProcessor
+ internal IPlatformTargetInternal InteropProcessor
{
get { return _proc; }
}
@@ -891,7 +899,7 @@ namespace Apache.Ignite.Core.Impl
/// </summary>
internal void ProcessorReleaseStart()
{
- InLongOutLong((int) Op.ReleaseStart, 0);
+ Target.InLongOutLong((int) Op.ReleaseStart, 0);
}
/// <summary>
@@ -899,7 +907,7 @@ namespace Apache.Ignite.Core.Impl
/// </summary>
internal bool LoggerIsLevelEnabled(LogLevel logLevel)
{
- return InLongOutLong((int) Op.LoggerIsLevelEnabled, (long) logLevel) == True;
+ return Target.InLongOutLong((int) Op.LoggerIsLevelEnabled, (long) logLevel) == True;
}
/// <summary>
@@ -907,7 +915,7 @@ namespace Apache.Ignite.Core.Impl
/// </summary>
internal void LoggerLog(LogLevel level, string msg, string category, string err)
{
- InStreamOutLong((int) Op.LoggerLog, w =>
+ Target.InStreamOutLong((int) Op.LoggerLog, w =>
{
w.WriteInt((int) level);
w.WriteString(msg);
@@ -921,7 +929,7 @@ namespace Apache.Ignite.Core.Impl
/// </summary>
internal IPlatformTarget GetExtension(int id)
{
- return InStreamOutObject((int) Op.GetExtension, w => w.WriteInt(id));
+ return ((IPlatformTarget) Target).InStreamOutObject((int) Op.GetExtension, w => w.WriteInt(id));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
index 1b43438..e17bcbf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Messaging/Messaging.cs
@@ -29,13 +29,12 @@ namespace Apache.Ignite.Core.Impl.Messaging
using Apache.Ignite.Core.Impl.Collections;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
/// <summary>
/// Messaging functionality.
/// </summary>
- internal class Messaging : PlatformTarget, IMessaging
+ internal class Messaging : PlatformTargetAdapter, IMessaging
{
/// <summary>
/// Opcodes.
@@ -67,10 +66,9 @@ namespace Apache.Ignite.Core.Impl.Messaging
/// Initializes a new instance of the <see cref="Messaging" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="prj">Cluster group.</param>
- public Messaging(IUnmanagedTarget target, Marshaller marsh, IClusterGroup prj)
- : base(target, marsh)
+ public Messaging(IPlatformTargetInternal target, IClusterGroup prj)
+ : base(target)
{
Debug.Assert(prj != null);
@@ -102,7 +100,7 @@ namespace Apache.Ignite.Core.Impl.Messaging
{
writer.Write(topic);
- WriteEnumerable(writer, messages.OfType<object>());
+ writer.WriteEnumerable(messages.OfType<object>());
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
new file mode 100644
index 0000000..f884c40
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformDisposableTargetAdapter.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using System;
+
+ /// <summary>
+ /// PlatformTargetAdapter with IDisposable pattern.
+ /// </summary>
+ internal abstract class PlatformDisposableTargetAdapter : PlatformTargetAdapter, IDisposable
+ {
+ /** Disposed flag. */
+ private volatile bool _disposed;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ protected PlatformDisposableTargetAdapter(IPlatformTargetInternal target) : base(target)
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ lock (this)
+ {
+ if (_disposed)
+ return;
+
+ Dispose(true);
+
+ GC.SuppressFinalize(this);
+
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Releases unmanaged and - optionally - managed resources.
+ /// </summary>
+ /// <param name="disposing">
+ /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer.
+ /// </param>
+ protected virtual void Dispose(bool disposing)
+ {
+ Target.Dispose();
+ }
+
+ /// <summary>
+ /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed.
+ /// </summary>
+ protected void ThrowIfDisposed()
+ {
+ if (_disposed)
+ throw new ObjectDisposedException(GetType().Name, "Object has been disposed.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs
new file mode 100644
index 0000000..725c112
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformJniTarget.cs
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.IO;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Memory;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Interop;
+ using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
+ using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Base class for interop targets.
+ /// </summary>
+ internal class PlatformJniTarget : IPlatformTargetInternal
+ {
+ /** */
+ private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap
+ = new Dictionary<Type, FutureType>
+ {
+ {typeof(bool), FutureType.Bool},
+ {typeof(byte), FutureType.Byte},
+ {typeof(char), FutureType.Char},
+ {typeof(double), FutureType.Double},
+ {typeof(float), FutureType.Float},
+ {typeof(int), FutureType.Int},
+ {typeof(long), FutureType.Long},
+ {typeof(short), FutureType.Short}
+ };
+
+ /** Unmanaged target. */
+ private readonly IUnmanagedTarget _target;
+
+ /** Marshaller. */
+ private readonly Marshaller _marsh;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ public PlatformJniTarget(IUnmanagedTarget target, Marshaller marsh)
+ {
+ Debug.Assert(target != null);
+ Debug.Assert(marsh != null);
+
+ _target = target;
+ _marsh = marsh;
+ }
+
+ /// <summary>
+ /// Gets the target.
+ /// </summary>
+ public IUnmanagedTarget Target
+ {
+ get { return _target; }
+ }
+
+ /** <inheritdoc /> */
+ public Marshaller Marshaller { get { return _marsh; } }
+
+ /** <inheritdoc /> */
+ public long InStreamOutLong(int type, Action<IBinaryStream> writeAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ writeAction(stream);
+
+ return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+ }
+ }
+
+ /** <inheritdoc /> */
+ public IPlatformTargetInternal InStreamOutObject(int type, Action<IBinaryStream> writeAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ writeAction(stream);
+
+ var target = UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput());
+
+ return target == null ? null : new PlatformJniTarget(target, _marsh);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public IPlatformTargetInternal OutObjectInternal(int type)
+ {
+ return GetPlatformTarget(UU.TargetOutObject(_target, type));
+ }
+
+ /** <inheritdoc /> */
+ public T OutStream<T>(int type, Func<IBinaryStream, T> readAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ UU.TargetOutStream(_target, type, stream.MemoryPointer);
+
+ stream.SynchronizeInput();
+
+ return readAction(stream);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public TR InStreamOutStream<TR>(int type, Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, TR> readAction)
+ {
+ using (var outStream = IgniteManager.Memory.Allocate().GetStream())
+ using (var inStream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ writeAction(outStream);
+
+ UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
+
+ inStream.SynchronizeInput();
+
+ return readAction(inStream);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public TR InStreamOutLong<TR>(int type, Action<IBinaryStream> outAction, Func<IBinaryStream, long, TR> inAction,
+ Func<IBinaryStream, Exception> readErrorAction)
+ {
+ Debug.Assert(readErrorAction != null);
+
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ outAction(stream);
+
+ var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+
+ if (res != PlatformTargetAdapter.Error && inAction == null)
+ return default(TR); // quick path for void operations
+
+ stream.SynchronizeInput();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ if (res != PlatformTargetAdapter.Error)
+ {
+ return inAction != null ? inAction(stream, res) : default(TR);
+ }
+
+ throw readErrorAction(stream);
+ }
+ }
+
+ /** <inheritdoc /> */
+ public unsafe TR InObjectStreamOutObjectStream<TR>(int type, Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, IPlatformTargetInternal, TR> readAction, IPlatformTargetInternal arg)
+ {
+ PlatformMemoryStream outStream = null;
+ long outPtr = 0;
+
+ PlatformMemoryStream inStream = null;
+ long inPtr = 0;
+
+ try
+ {
+ if (writeAction != null)
+ {
+ outStream = IgniteManager.Memory.Allocate().GetStream();
+ writeAction(outStream);
+ outPtr = outStream.SynchronizeOutput();
+ }
+
+ if (readAction != null)
+ {
+ inStream = IgniteManager.Memory.Allocate().GetStream();
+ inPtr = inStream.MemoryPointer;
+ }
+
+ var res = UU.TargetInObjectStreamOutObjectStream(_target, type,
+ ((PlatformJniTarget)arg).Target.Target, outPtr, inPtr);
+
+ if (readAction == null)
+ return default(TR);
+
+ inStream.SynchronizeInput();
+
+ var target = res == null ? null : new PlatformJniTarget(res, _marsh);
+
+ return readAction(inStream, target);
+
+ }
+ finally
+ {
+ try
+ {
+ if (inStream != null)
+ inStream.Dispose();
+
+ }
+ finally
+ {
+ if (outStream != null)
+ outStream.Dispose();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Finish marshaling.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ private void FinishMarshal(BinaryWriter writer)
+ {
+ _marsh.FinishMarshal(writer);
+ }
+
+ /// <summary>
+ /// Creates a future and starts listening.
+ /// </summary>
+ /// <typeparam name="T">Future result type</typeparam>
+ /// <param name="listenAction">The listen action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Created future.</returns>
+ private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ var futType = FutureType.Object;
+
+ var type = typeof(T);
+
+ if (type.IsPrimitive)
+ IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+ var fut = convertFunc == null && futType != FutureType.Object
+ ? new Future<T>()
+ : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+ var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+ IUnmanagedTarget futTarget;
+
+ try
+ {
+ futTarget = listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+ throw;
+ }
+
+ fut.SetTarget(new Listenable(new PlatformJniTarget(futTarget, _marsh)));
+
+ return fut;
+ }
+
+ /// <summary>
+ /// Creates a future and starts listening.
+ /// </summary>
+ /// <typeparam name="T">Future result type</typeparam>
+ /// <param name="listenAction">The listen action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Created future.</returns>
+ private Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ var futType = FutureType.Object;
+
+ var type = typeof(T);
+
+ if (type.IsPrimitive)
+ IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+ var fut = convertFunc == null && futType != FutureType.Object
+ ? new Future<T>()
+ : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+ var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+ try
+ {
+ listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+ throw;
+ }
+
+ return fut;
+ }
+
+ #region IPlatformTarget
+
+ /** <inheritdoc /> */
+ public long InLongOutLong(int type, long val)
+ {
+ return UU.TargetInLongOutLong(_target, type, val);
+ }
+
+ /** <inheritdoc /> */
+ public long InStreamOutLong(int type, Action<IBinaryRawWriter> writeAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+
+ return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+ }
+ }
+
+ /** <inheritdoc /> */
+ public T InStreamOutStream<T>(int type, Action<IBinaryRawWriter> writeAction,
+ Func<IBinaryRawReader, T> readAction)
+ {
+ using (var outStream = IgniteManager.Memory.Allocate().GetStream())
+ using (var inStream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(outStream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+
+ UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
+
+ inStream.SynchronizeInput();
+
+ return readAction(_marsh.StartUnmarshal(inStream));
+ }
+ }
+
+ /** <inheritdoc /> */
+ public IPlatformTarget InStreamOutObject(int type, Action<IBinaryRawWriter> writeAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+
+ return GetPlatformTarget(UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput()));
+ }
+ }
+
+ /** <inheritdoc /> */
+ public unsafe T InObjectStreamOutObjectStream<T>(int type, IPlatformTarget arg,
+ Action<IBinaryRawWriter> writeAction, Func<IBinaryRawReader, IPlatformTarget, T> readAction)
+ {
+ PlatformMemoryStream outStream = null;
+ long outPtr = 0;
+
+ PlatformMemoryStream inStream = null;
+ long inPtr = 0;
+
+ try
+ {
+ if (writeAction != null)
+ {
+ outStream = IgniteManager.Memory.Allocate().GetStream();
+ var writer = _marsh.StartMarshal(outStream);
+ writeAction(writer);
+ FinishMarshal(writer);
+ outPtr = outStream.SynchronizeOutput();
+ }
+
+ if (readAction != null)
+ {
+ inStream = IgniteManager.Memory.Allocate().GetStream();
+ inPtr = inStream.MemoryPointer;
+ }
+
+ var res = UU.TargetInObjectStreamOutObjectStream(_target, type, GetTargetPtr(arg), outPtr, inPtr);
+
+ if (readAction == null)
+ return default(T);
+
+ inStream.SynchronizeInput();
+
+ return readAction(_marsh.StartUnmarshal(inStream), GetPlatformTarget(res));
+
+ }
+ finally
+ {
+ try
+ {
+ if (inStream != null)
+ inStream.Dispose();
+
+ }
+ finally
+ {
+ if (outStream != null)
+ outStream.Dispose();
+ }
+ }
+ }
+
+ /** <inheritdoc /> */
+ public T OutStream<T>(int type, Func<IBinaryRawReader, T> readAction)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ UU.TargetOutStream(_target, type, stream.MemoryPointer);
+
+ stream.SynchronizeInput();
+
+ return readAction(_marsh.StartUnmarshal(stream));
+ }
+ }
+
+ /** <inheritdoc /> */
+ public IPlatformTarget OutObject(int type)
+ {
+ return OutObjectInternal(type);
+ }
+
+ /** <inheritdoc /> */
+ public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null,
+ Func<IBinaryRawReader, T> readAction = null)
+ {
+ var convertFunc = readAction != null
+ ? r => readAction(r)
+ : (Func<BinaryReader, T>)null;
+ return GetFuture((futId, futType) =>
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ stream.WriteLong(futId);
+ stream.WriteInt(futType);
+
+ if (writeAction != null)
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+ }
+
+ UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput());
+ }
+ }, false, convertFunc).Task;
+ }
+
+ /** <inheritdoc /> */
+ public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction,
+ Func<IBinaryRawReader, T> readAction, CancellationToken cancellationToken)
+ {
+ var convertFunc = readAction != null
+ ? r => readAction(r)
+ : (Func<BinaryReader, T>) null;
+
+ return GetFuture((futId, futType) =>
+ {
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ stream.WriteLong(futId);
+ stream.WriteInt(futType);
+
+ if (writeAction != null)
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ writeAction(writer);
+
+ FinishMarshal(writer);
+ }
+
+ return UU.TargetInStreamOutObjectAsync(_target, type, stream.SynchronizeOutput());
+ }
+ }, false, convertFunc).GetTask(cancellationToken);
+ }
+
+ /// <summary>
+ /// Gets the platform target.
+ /// </summary>
+ private IPlatformTargetInternal GetPlatformTarget(IUnmanagedTarget target)
+ {
+ return target == null ? null : new PlatformJniTarget(target, _marsh);
+ }
+
+ /// <summary>
+ /// Gets the target pointer.
+ /// </summary>
+ private static unsafe void* GetTargetPtr(IPlatformTarget target)
+ {
+ return target == null ? null : ((PlatformJniTarget) target)._target.Target;
+ }
+
+ #endregion
+
+ /** <inheritdoc /> */
+ [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
+ Justification = "There is no finalizer.")]
+ public void Dispose()
+ {
+ if (_target != null)
+ {
+ _target.Dispose();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
deleted file mode 100644
index 474af0e..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ /dev/null
@@ -1,1086 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl
-{
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
- using System.IO;
- using System.Threading.Tasks;
- using Apache.Ignite.Core.Binary;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Memory;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using Apache.Ignite.Core.Interop;
- using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
- using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
-
- /// <summary>
- /// Base class for interop targets.
- /// </summary>
- [SuppressMessage("ReSharper", "LocalVariableHidesMember")]
- internal class PlatformTarget : IPlatformTarget
- {
- /** */
- protected const int False = 0;
-
- /** */
- protected const int True = 1;
-
- /** */
- protected const int Error = -1;
-
- /** */
- public const int OpNone = -2;
-
- /** */
- private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap
- = new Dictionary<Type, FutureType>
- {
- {typeof(bool), FutureType.Bool},
- {typeof(byte), FutureType.Byte},
- {typeof(char), FutureType.Char},
- {typeof(double), FutureType.Double},
- {typeof(float), FutureType.Float},
- {typeof(int), FutureType.Int},
- {typeof(long), FutureType.Long},
- {typeof(short), FutureType.Short}
- };
-
- /** Unmanaged target. */
- private readonly IUnmanagedTarget _target;
-
- /** Marshaller. */
- private readonly Marshaller _marsh;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public PlatformTarget(IUnmanagedTarget target, Marshaller marsh)
- {
- Debug.Assert(target != null);
- Debug.Assert(marsh != null);
-
- _target = target;
- _marsh = marsh;
- }
-
- /// <summary>
- /// Unmanaged target.
- /// </summary>
- internal IUnmanagedTarget Target
- {
- get { return _target; }
- }
-
- /// <summary>
- /// Marshaller.
- /// </summary>
- internal Marshaller Marshaller
- {
- get { return _marsh; }
- }
-
- #region Static Helpers
-
- /// <summary>
- /// Write collection.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- /// <returns>The same writer for chaining.</returns>
- protected static BinaryWriter WriteCollection<T>(BinaryWriter writer, ICollection<T> vals)
- {
- return WriteCollection<T, T>(writer, vals, null);
- }
-
- /// <summary>
- /// Write nullable collection.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- /// <returns>The same writer for chaining.</returns>
- protected static BinaryWriter WriteNullableCollection<T>(BinaryWriter writer, ICollection<T> vals)
- {
- return WriteNullable(writer, vals, WriteCollection);
- }
-
- /// <summary>
- /// Write collection.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- /// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>The same writer for chaining.</returns>
- protected static BinaryWriter WriteCollection<T1, T2>(BinaryWriter writer,
- ICollection<T1> vals, Func<T1, T2> selector)
- {
- writer.WriteInt(vals.Count);
-
- if (selector == null)
- {
- foreach (var val in vals)
- writer.Write(val);
- }
- else
- {
- foreach (var val in vals)
- writer.Write(selector(val));
- }
-
- return writer;
- }
-
- /// <summary>
- /// Write enumerable.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- /// <returns>The same writer for chaining.</returns>
- protected static BinaryWriter WriteEnumerable<T>(BinaryWriter writer, IEnumerable<T> vals)
- {
- return WriteEnumerable<T, T>(writer, vals, null);
- }
-
- /// <summary>
- /// Write enumerable.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- /// <param name="selector">A transform function to apply to each element.</param>
- /// <returns>The same writer for chaining.</returns>
- protected static BinaryWriter WriteEnumerable<T1, T2>(BinaryWriter writer,
- IEnumerable<T1> vals, Func<T1, T2> selector)
- {
- var col = vals as ICollection<T1>;
-
- if (col != null)
- return WriteCollection(writer, col, selector);
-
- var stream = writer.Stream;
-
- var pos = stream.Position;
-
- stream.Seek(4, SeekOrigin.Current);
-
- var size = 0;
-
- if (selector == null)
- {
- foreach (var val in vals)
- {
- writer.Write(val);
-
- size++;
- }
- }
- else
- {
- foreach (var val in vals)
- {
- writer.Write(selector(val));
-
- size++;
- }
- }
-
- stream.WriteInt(pos, size);
-
- return writer;
- }
-
- /// <summary>
- /// Write dictionary.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="vals">Values.</param>
- protected static void WriteDictionary<T1, T2>(BinaryWriter writer, IEnumerable<KeyValuePair<T1, T2>> vals)
- {
- var pos = writer.Stream.Position;
- writer.WriteInt(0); // Reserve count.
-
- int cnt = 0;
-
- foreach (var pair in vals)
- {
- writer.Write(pair.Key);
- writer.Write(pair.Value);
-
- cnt++;
- }
-
- writer.Stream.WriteInt(pos, cnt);
- }
-
- /// <summary>
- /// Write a nullable item.
- /// </summary>
- /// <param name="writer">Writer.</param>
- /// <param name="item">Item.</param>
- /// <param name="writeItem">Write action to perform on item when it is not null.</param>
- /// <returns>The same writer for chaining.</returns>
- private static BinaryWriter WriteNullable<T>(BinaryWriter writer, T item,
- Func<BinaryWriter, T, BinaryWriter> writeItem) where T : class
- {
- if (item == null)
- {
- writer.WriteBoolean(false);
-
- return writer;
- }
-
- writer.WriteBoolean(true);
-
- return writeItem(writer, item);
- }
-
- #endregion
-
- #region OUT operations
-
- /// <summary>
- /// Perform out operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="action">Action to be performed on the stream.</param>
- /// <returns></returns>
- protected long DoOutOp(int type, Action<IBinaryStream> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- action(stream);
-
- return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
- }
- }
-
- /// <summary>
- /// Perform out operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="action">Action to be performed on the stream.</param>
- /// <returns></returns>
- protected long DoOutOp(int type, Action<BinaryWriter> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- var writer = _marsh.StartMarshal(stream);
-
- action(writer);
-
- FinishMarshal(writer);
-
- return UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
- }
- }
-
- /// <summary>
- /// Perform out operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="action">Action to be performed on the stream.</param>
- /// <returns>Resulting object.</returns>
- protected IUnmanagedTarget DoOutOpObject(int type, Action<BinaryWriter> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- var writer = _marsh.StartMarshal(stream);
-
- action(writer);
-
- FinishMarshal(writer);
-
- return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput());
- }
- }
-
- /// <summary>
- /// Perform out operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="action">Action to be performed on the stream.</param>
- /// <returns>Resulting object.</returns>
- protected IUnmanagedTarget DoOutOpObject(int type, Action<IBinaryStream> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- action(stream);
-
- return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput());
- }
- }
-
- /// <summary>
- /// Perform out operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <returns>Resulting object.</returns>
- protected IUnmanagedTarget DoOutOpObject(int type)
- {
- return UU.TargetOutObject(_target, type);
- }
-
- /// <summary>
- /// Perform simple output operation accepting single argument.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val1">Value.</param>
- /// <returns>Result.</returns>
- protected long DoOutOp<T1>(int type, T1 val1)
- {
- return DoOutOp(type, writer =>
- {
- writer.Write(val1);
- });
- }
-
- /// <summary>
- /// Perform simple output operation accepting two arguments.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val1">Value 1.</param>
- /// <param name="val2">Value 2.</param>
- /// <returns>Result.</returns>
- protected long DoOutOp<T1, T2>(int type, T1 val1, T2 val2)
- {
- return DoOutOp(type, writer =>
- {
- writer.Write(val1);
- writer.Write(val2);
- });
- }
-
- /// <summary>
- /// Perform simple output operation accepting three arguments.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val1">Value 1.</param>
- /// <param name="val2">Value 2.</param>
- /// <param name="val3">Value 3.</param>
- /// <returns>Result.</returns>
- protected long DoOutOp<T1, T2, T3>(int type, T1 val1, T2 val2, T3 val3)
- {
- return DoOutOp(type, writer =>
- {
- writer.Write(val1);
- writer.Write(val2);
- writer.Write(val3);
- });
- }
-
- #endregion
-
- #region IN operations
-
- /// <summary>
- /// Perform in operation.
- /// </summary>
- /// <param name="type">Type.</param>
- /// <param name="action">Action.</param>
- protected void DoInOp(int type, Action<IBinaryStream> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- UU.TargetOutStream(_target, type, stream.MemoryPointer);
-
- stream.SynchronizeInput();
-
- action(stream);
- }
- }
-
- /// <summary>
- /// Perform in operation.
- /// </summary>
- /// <param name="type">Type.</param>
- /// <param name="action">Action.</param>
- /// <returns>Result.</returns>
- protected T DoInOp<T>(int type, Func<IBinaryStream, T> action)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- UU.TargetOutStream(_target, type, stream.MemoryPointer);
-
- stream.SynchronizeInput();
-
- return action(stream);
- }
- }
-
- /// <summary>
- /// Perform simple in operation returning immediate result.
- /// </summary>
- /// <param name="type">Type.</param>
- /// <returns>Result.</returns>
- protected T DoInOp<T>(int type)
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- UU.TargetOutStream(_target, type, stream.MemoryPointer);
-
- stream.SynchronizeInput();
-
- return Unmarshal<T>(stream);
- }
- }
-
- #endregion
-
- #region OUT-IN operations
-
- /// <summary>
- /// Perform out-in operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <param name="inAction">In action.</param>
- protected void DoOutInOp(int type, Action<BinaryWriter> outAction, Action<IBinaryStream> inAction)
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream())
- {
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream())
- {
- BinaryWriter writer = _marsh.StartMarshal(outStream);
-
- outAction(writer);
-
- FinishMarshal(writer);
-
- UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
-
- inStream.SynchronizeInput();
-
- inAction(inStream);
- }
- }
- }
-
- /// <summary>
- /// Perform out-in operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <param name="inAction">In action.</param>
- /// <returns>Result.</returns>
- protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, TR> inAction)
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream())
- {
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream())
- {
- BinaryWriter writer = _marsh.StartMarshal(outStream);
-
- outAction(writer);
-
- FinishMarshal(writer);
-
- UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
-
- inStream.SynchronizeInput();
-
- return inAction(inStream);
- }
- }
- }
-
- /// <summary>
- /// Perform out-in operation with a single stream.
- /// </summary>
- /// <typeparam name="TR">The type of the r.</typeparam>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <param name="inAction">In action.</param>
- /// <param name="inErrorAction">The action to read an error.</param>
- /// <returns>
- /// Result.
- /// </returns>
- protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
- Func<IBinaryStream, Exception> inErrorAction)
- {
- Debug.Assert(inErrorAction != null);
-
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- var writer = _marsh.StartMarshal(stream);
-
- outAction(writer);
-
- FinishMarshal(writer);
-
- var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
-
- if (res != Error && inAction == null)
- return default(TR); // quick path for void operations
-
- stream.SynchronizeInput();
-
- stream.Seek(0, SeekOrigin.Begin);
-
- if (res != Error)
- return inAction != null ? inAction(stream, res) : default(TR);
-
- throw inErrorAction(stream);
- }
- }
-
- /// <summary>
- /// Perform out-in operation with a single stream.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <param name="inErrorAction">The action to read an error.</param>
- /// <returns>
- /// Result.
- /// </returns>
- protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
- Func<IBinaryStream, Exception> inErrorAction)
- {
- Debug.Assert(inErrorAction != null);
-
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- var writer = _marsh.StartMarshal(stream);
-
- outAction(writer);
-
- FinishMarshal(writer);
-
- var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
-
- if (res != Error)
- return res == True;
-
- stream.SynchronizeInput();
-
- stream.Seek(0, SeekOrigin.Begin);
-
- throw inErrorAction(stream);
- }
- }
-
- /// <summary>
- /// Perform out-in operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <param name="inAction">In action.</param>
- /// <param name="arg">Argument.</param>
- /// <returns>Result.</returns>
- protected unsafe TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction,
- Func<IBinaryStream, IUnmanagedTarget, TR> inAction, void* arg)
- {
- PlatformMemoryStream outStream = null;
- long outPtr = 0;
-
- PlatformMemoryStream inStream = null;
- long inPtr = 0;
-
- try
- {
- if (outAction != null)
- {
- outStream = IgniteManager.Memory.Allocate().GetStream();
- var writer = _marsh.StartMarshal(outStream);
- outAction(writer);
- FinishMarshal(writer);
- outPtr = outStream.SynchronizeOutput();
- }
-
- if (inAction != null)
- {
- inStream = IgniteManager.Memory.Allocate().GetStream();
- inPtr = inStream.MemoryPointer;
- }
-
- var res = UU.TargetInObjectStreamOutObjectStream(_target, type, arg, outPtr, inPtr);
-
- if (inAction == null)
- return default(TR);
-
- inStream.SynchronizeInput();
-
- return inAction(inStream, res);
-
- }
- finally
- {
- try
- {
- if (inStream != null)
- inStream.Dispose();
-
- }
- finally
- {
- if (outStream != null)
- outStream.Dispose();
- }
- }
- }
-
- /// <summary>
- /// Perform out-in operation.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="outAction">Out action.</param>
- /// <returns>Result.</returns>
- protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction)
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream())
- {
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream())
- {
- BinaryWriter writer = _marsh.StartMarshal(outStream);
-
- outAction(writer);
-
- FinishMarshal(writer);
-
- UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
-
- inStream.SynchronizeInput();
-
- return Unmarshal<TR>(inStream);
- }
- }
- }
-
- /// <summary>
- /// Perform simple out-in operation accepting single argument.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val">Value.</param>
- /// <returns>Result.</returns>
- protected TR DoOutInOp<T1, TR>(int type, T1 val)
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream())
- {
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream())
- {
- BinaryWriter writer = _marsh.StartMarshal(outStream);
-
- writer.WriteObject(val);
-
- FinishMarshal(writer);
-
- UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
-
- inStream.SynchronizeInput();
-
- return Unmarshal<TR>(inStream);
- }
- }
- }
-
- /// <summary>
- /// Perform simple out-in operation accepting two arguments.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val1">Value.</param>
- /// <param name="val2">Value.</param>
- /// <returns>Result.</returns>
- protected TR DoOutInOp<T1, T2, TR>(int type, T1 val1, T2 val2)
- {
- using (PlatformMemoryStream outStream = IgniteManager.Memory.Allocate().GetStream())
- {
- using (PlatformMemoryStream inStream = IgniteManager.Memory.Allocate().GetStream())
- {
- BinaryWriter writer = _marsh.StartMarshal(outStream);
-
- writer.WriteObject(val1);
- writer.WriteObject(val2);
-
- FinishMarshal(writer);
-
- UU.TargetInStreamOutStream(_target, type, outStream.SynchronizeOutput(), inStream.MemoryPointer);
-
- inStream.SynchronizeInput();
-
- return Unmarshal<TR>(inStream);
- }
- }
- }
-
- /// <summary>
- /// Perform simple out-in operation accepting two arguments.
- /// </summary>
- /// <param name="type">Operation type.</param>
- /// <param name="val">Value.</param>
- /// <returns>Result.</returns>
- protected long DoOutInOp(int type, long val = 0)
- {
- return UU.TargetInLongOutLong(_target, type, val);
- }
-
- #endregion
-
- #region Async operations
-
- /// <summary>
- /// Performs async operation.
- /// </summary>
- /// <param name="type">The type code.</param>
- /// <param name="writeAction">The write action.</param>
- /// <returns>Task for async operation</returns>
- protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null)
- {
- return DoOutOpAsync<object>(type, writeAction);
- }
-
- /// <summary>
- /// Performs async operation.
- /// </summary>
- /// <typeparam name="T">Type of the result.</typeparam>
- /// <param name="type">The type code.</param>
- /// <param name="writeAction">The write action.</param>
- /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
- /// <param name="convertFunc">The function to read future result from stream.</param>
- /// <returns>Task for async operation</returns>
- protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false,
- Func<BinaryReader, T> convertFunc = null)
- {
- return GetFuture((futId, futType) => DoOutOp(type, w =>
- {
- if (writeAction != null)
- writeAction(w);
- w.WriteLong(futId);
- w.WriteInt(futType);
- }), keepBinary, convertFunc).Task;
- }
-
- /// <summary>
- /// Performs async operation.
- /// </summary>
- /// <typeparam name="T">Type of the result.</typeparam>
- /// <param name="type">The type code.</param>
- /// <param name="writeAction">The write action.</param>
- /// <returns>Future for async operation</returns>
- protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction)
- {
- return GetFuture<T>((futId, futType) => DoOutOpObject(type, w =>
- {
- writeAction(w);
- w.WriteLong(futId);
- w.WriteInt(futType);
- }));
- }
-
- /// <summary>
- /// Performs async operation.
- /// </summary>
- /// <typeparam name="TR">Type of the result.</typeparam>
- /// <typeparam name="T1">The type of the first arg.</typeparam>
- /// <param name="type">The type code.</param>
- /// <param name="val1">First arg.</param>
- /// <returns>
- /// Task for async operation
- /// </returns>
- protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1)
- {
- return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
- {
- w.WriteObject(val1);
- w.WriteLong(futId);
- w.WriteInt(futType);
- })).Task;
- }
-
- /// <summary>
- /// Performs async operation.
- /// </summary>
- /// <typeparam name="TR">Type of the result.</typeparam>
- /// <typeparam name="T1">The type of the first arg.</typeparam>
- /// <typeparam name="T2">The type of the second arg.</typeparam>
- /// <param name="type">The type code.</param>
- /// <param name="val1">First arg.</param>
- /// <param name="val2">Second arg.</param>
- /// <returns>
- /// Task for async operation
- /// </returns>
- protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2)
- {
- return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
- {
- w.WriteObject(val1);
- w.WriteObject(val2);
- w.WriteLong(futId);
- w.WriteInt(futType);
- })).Task;
- }
-
- #endregion
-
- #region Miscelanneous
-
- /// <summary>
- /// Finish marshaling.
- /// </summary>
- /// <param name="writer">Writer.</param>
- internal void FinishMarshal(BinaryWriter writer)
- {
- _marsh.FinishMarshal(writer);
- }
-
- /// <summary>
- /// Unmarshal object using the given stream.
- /// </summary>
- /// <param name="stream">Stream.</param>
- /// <returns>Unmarshalled object.</returns>
- protected virtual T Unmarshal<T>(IBinaryStream stream)
- {
- return _marsh.Unmarshal<T>(stream);
- }
-
- /// <summary>
- /// Creates a future and starts listening.
- /// </summary>
- /// <typeparam name="T">Future result type</typeparam>
- /// <param name="listenAction">The listen action.</param>
- /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
- /// <param name="convertFunc">The function to read future result from stream.</param>
- /// <returns>Created future.</returns>
- private Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false,
- Func<BinaryReader, T> convertFunc = null)
- {
- var futType = FutureType.Object;
-
- var type = typeof(T);
-
- if (type.IsPrimitive)
- IgniteFutureTypeMap.TryGetValue(type, out futType);
-
- var fut = convertFunc == null && futType != FutureType.Object
- ? new Future<T>()
- : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
-
- var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
-
- IUnmanagedTarget futTarget;
-
- try
- {
- futTarget = listenAction(futHnd, (int)futType);
- }
- catch (Exception)
- {
- _marsh.Ignite.HandleRegistry.Release(futHnd);
-
- throw;
- }
-
- fut.SetTarget(new Listenable(futTarget, _marsh));
-
- return fut;
- }
-
- /// <summary>
- /// Creates a future and starts listening.
- /// </summary>
- /// <typeparam name="T">Future result type</typeparam>
- /// <param name="listenAction">The listen action.</param>
- /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
- /// <param name="convertFunc">The function to read future result from stream.</param>
- /// <returns>Created future.</returns>
- protected Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
- Func<BinaryReader, T> convertFunc = null)
- {
- var futType = FutureType.Object;
-
- var type = typeof(T);
-
- if (type.IsPrimitive)
- IgniteFutureTypeMap.TryGetValue(type, out futType);
-
- var fut = convertFunc == null && futType != FutureType.Object
- ? new Future<T>()
- : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
-
- var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
-
- try
- {
- listenAction(futHnd, (int)futType);
- }
- catch (Exception)
- {
- _marsh.Ignite.HandleRegistry.Release(futHnd);
-
- throw;
- }
-
- return fut;
- }
-
- #endregion
-
- #region IPlatformTarget
-
- /** <inheritdoc /> */
- public long InLongOutLong(int type, long val)
- {
- return DoOutInOp(type, val);
- }
-
- /** <inheritdoc /> */
- public long InStreamOutLong(int type, Action<IBinaryRawWriter> writeAction)
- {
- return DoOutOp(type, writer => writeAction(writer));
- }
-
- /** <inheritdoc /> */
- public T InStreamOutStream<T>(int type, Action<IBinaryRawWriter> writeAction,
- Func<IBinaryRawReader, T> readAction)
- {
- return DoOutInOp(type, writeAction, stream => readAction(Marshaller.StartUnmarshal(stream)));
- }
-
- /** <inheritdoc /> */
- public IPlatformTarget InStreamOutObject(int type, Action<IBinaryRawWriter> writeAction)
- {
- return GetPlatformTarget(DoOutOpObject(type, writeAction));
- }
-
- /** <inheritdoc /> */
- public unsafe T InObjectStreamOutObjectStream<T>(int type, IPlatformTarget arg, Action<IBinaryRawWriter> writeAction,
- Func<IBinaryRawReader, IPlatformTarget, T> readAction)
- {
- return DoOutInOp(type, writeAction, (stream, obj) => readAction(Marshaller.StartUnmarshal(stream),
- GetPlatformTarget(obj)), GetTargetPtr(arg));
- }
-
- /** <inheritdoc /> */
- public T OutStream<T>(int type, Func<IBinaryRawReader, T> readAction)
- {
- return DoInOp(type, stream => readAction(Marshaller.StartUnmarshal(stream)));
- }
-
- /** <inheritdoc /> */
- public IPlatformTarget OutObject(int type)
- {
- return GetPlatformTarget(DoOutOpObject(type));
- }
-
- /** <inheritdoc /> */
- public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction = null,
- Func<IBinaryRawReader, T> readAction = null)
- {
- var convertFunc = readAction != null
- ? r => readAction(r)
- : (Func<BinaryReader, T>) null;
-
- return GetFuture((futId, futType) =>
- {
- using (var stream = IgniteManager.Memory.Allocate().GetStream())
- {
- stream.WriteLong(futId);
- stream.WriteInt(futType);
-
- if (writeAction != null)
- {
- var writer = _marsh.StartMarshal(stream);
-
- writeAction(writer);
-
- FinishMarshal(writer);
- }
-
- UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput());
- }
- }, false, convertFunc).Task;
- }
-
- /// <summary>
- /// Gets the platform target.
- /// </summary>
- private IPlatformTarget GetPlatformTarget(IUnmanagedTarget target)
- {
- return target == null ? null : new PlatformTarget(target, Marshaller);
- }
-
- /// <summary>
- /// Gets the target pointer.
- /// </summary>
- private static unsafe void* GetTargetPtr(IPlatformTarget target)
- {
- return target == null ? null : ((PlatformTarget) target).Target.Target;
- }
-
- #endregion
- }
-
- /// <summary>
- /// PlatformTarget with IDisposable pattern.
- /// </summary>
- internal abstract class PlatformDisposableTarget : PlatformTarget, IDisposable
- {
- /** Disposed flag. */
- private volatile bool _disposed;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- protected PlatformDisposableTarget(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
- {
- // No-op.
- }
-
- /** <inheritdoc /> */
- public void Dispose()
- {
- lock (this)
- {
- if (_disposed)
- return;
-
- Dispose(true);
-
- GC.SuppressFinalize(this);
-
- _disposed = true;
- }
- }
-
- /// <summary>
- /// Releases unmanaged and - optionally - managed resources.
- /// </summary>
- /// <param name="disposing">
- /// <c>true</c> when called from Dispose; <c>false</c> when called from finalizer.
- /// </param>
- protected virtual void Dispose(bool disposing)
- {
- Target.Dispose();
- }
-
- /// <summary>
- /// Throws <see cref="ObjectDisposedException"/> if this instance has been disposed.
- /// </summary>
- protected void ThrowIfDisposed()
- {
- if (_disposed)
- throw new ObjectDisposedException(GetType().Name, "Object has been disposed.");
- }
- }
-}
[2/8] ignite git commit: IGNITE-2190 ScanQuery without a filter
triggers object's deserialization on the server side
Posted by sb...@apache.org.
IGNITE-2190 ScanQuery without a filter triggers object's deserialization on the server side
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29413922
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29413922
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29413922
Branch: refs/heads/ignite-5578
Commit: 2941392213d2d3d9632a30d1726502d31a12e938
Parents: 7a6af69
Author: Nikolay Izhikov <NI...@gmail.com>
Authored: Thu Jul 27 19:00:08 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jul 27 19:00:08 2017 +0300
----------------------------------------------------------------------
.../eventstorage/GridEventStorageManager.java | 14 +++++++++
.../cache/query/GridCacheQueryManager.java | 24 ++++++++++++----
.../IgniteCacheBinaryObjectsScanSelfTest.java | 9 +++++-
...acheBinaryObjectsScanWithEventsSelfTest.java | 30 ++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 2 ++
5 files changed, 73 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/29413922/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 944420f..2b9a5ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -534,6 +534,20 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
}
/**
+ * Checks whether this event type has any listener.
+ *
+ * @param type Event type to check.
+ * @return Whether or not this event type has any listener.
+ */
+ public boolean hasListener(int type) {
+ assert type > 0 : "Invalid event type: " + type;
+
+ Listeners listeners = lsnrs.get(type);
+
+ return (listeners != null) && (!F.isEmpty(listeners.highPriorityLsnrs) || !F.isEmpty(listeners.lsnrs));
+ }
+
+ /**
* Checks whether all provided events are user-recordable.
* <p>
* Note that this method supports only predefined Ignite events.
http://git-wip-us.apache.org/repos/asf/ignite/blob/29413922/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 0f47b7f..f107038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -103,6 +103,7 @@ import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -118,6 +119,7 @@ import org.apache.ignite.spi.indexing.IndexingSpi;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -1015,7 +1017,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
metrics.addGetTimeNanos(System.nanoTime() - start);
}
- if (readEvt) {
+ if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
cctx.gridEvents().record(new CacheQueryReadEvent<K, V>(
cctx.localNode(),
"SQL fields query result set row read.",
@@ -1135,6 +1137,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
boolean rmvIter = true;
+ GridCacheQueryAdapter<?> qry = qryInfo.query();
+
try {
// Preparing query closures.
IgniteClosure<Cache.Entry<K, V>, Object> trans =
@@ -1145,8 +1149,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
injectResources(trans);
injectResources(rdc);
- GridCacheQueryAdapter<?> qry = qryInfo.query();
-
int pageSize = qry.pageSize();
boolean incBackups = qry.includeBackups();
@@ -1245,7 +1247,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
K key0 = null;
V val0 = null;
- if (readEvt) {
+ if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
@@ -1372,6 +1374,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
catch (Throwable e) {
+ if (X.hasCause(e, ClassNotFoundException.class) && !qry.keepBinary() && cctx.binaryMarshaller() &&
+ !cctx.localNode().isClient() && !log.isQuiet()) {
+ LT.warn(log, "Suggestion for the cause of ClassNotFoundException");
+ LT.warn(log, "To disable, set -D" + IGNITE_QUIET + "=true");
+ LT.warn(log, " ^-- Ignite configured to use BinaryMarshaller but keepBinary is false for " +
+ "request");
+ LT.warn(log, " ^-- Server node need to load definition of data classes. " +
+ "It can be reason of ClassNotFoundException(consider IgniteCache.withKeepBinary to fix)");
+ LT.warn(log, "Refer this page for detailed information: " +
+ "https://apacheignite.readme.io/docs/binary-marshaller");
+ }
+
if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
@@ -1471,7 +1485,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
metrics.addGetTimeNanos(System.nanoTime() - start);
}
- if (readEvt) {
+ if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
cctx.gridEvents().record(new CacheQueryReadEvent<>(
cctx.localNode(),
"Scan query entry read.",
http://git-wip-us.apache.org/repos/asf/ignite/blob/29413922/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
index e0da1f6..f18aebe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java
@@ -75,7 +75,7 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest
discoSpi.setIpFinder(IP_FINDER);
cfg.setDiscoverySpi(discoSpi);
- cfg.setIncludeEventTypes(new int[0]);
+ cfg.setIncludeEventTypes(getIncludeEventTypes());
cfg.setMarshaller(null);
cfg.setPeerClassLoadingEnabled(false);
@@ -90,6 +90,13 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest
}
/**
+ * @return EventTypes to record.
+ */
+ protected int[] getIncludeEventTypes() {
+ return new int[0];
+ }
+
+ /**
* @param ldr Class loader.
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/29413922/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanWithEventsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanWithEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanWithEventsSelfTest.java
new file mode 100644
index 0000000..5746a34
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanWithEventsSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.events.EventType;
+
+/**
+ *
+ */
+public class IgniteCacheBinaryObjectsScanWithEventsSelfTest extends IgniteCacheBinaryObjectsScanSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int[] getIncludeEventTypes() {
+ return EventType.EVTS_ALL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/29413922/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 1cfb345..8164fe0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.BinarySerializationQuerySelfTest;
import org.apache.ignite.internal.processors.cache.BinarySerializationQueryWithReflectiveSerializerSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheBinaryObjectsScanWithEventsSelfTest;
/**
* Cache query suite with binary marshaller.
@@ -37,6 +38,7 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite {
suite.addTestSuite(BinarySerializationQuerySelfTest.class);
suite.addTestSuite(BinarySerializationQueryWithReflectiveSerializerSelfTest.class);
suite.addTestSuite(IgniteCacheBinaryObjectsScanSelfTest.class);
+ suite.addTestSuite(IgniteCacheBinaryObjectsScanWithEventsSelfTest.class);
//Should be adjusted. Not ready to be used with BinaryMarshaller.
//suite.addTestSuite(GridCacheBinarySwapScanQuerySelfTest.class);
[7/8] ignite git commit: ignite-5858 Fixed affinity initialization on
new coordinator (broken in aeb9336b3b161ddfff73f17e41cd453409b84a16).
Posted by sb...@apache.org.
ignite-5858 Fixed affinity initialization on new coordinator (broken in aeb9336b3b161ddfff73f17e41cd453409b84a16).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b698bbfc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b698bbfc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b698bbfc
Branch: refs/heads/ignite-5578
Commit: b698bbfcaa2056b9792404aef38e427ff323bd57
Parents: 89bba2f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 28 10:25:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 28 10:25:16 2017 +0300
----------------------------------------------------------------------
.../processors/cache/CacheAffinitySharedManager.java | 2 +-
.../cache/GridCachePartitionExchangeManager.java | 13 ++++++++++++-
.../dht/preloader/GridDhtPartitionsExchangeFuture.java | 10 +++++++++-
3 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b698bbfc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5a7f634..51834c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1461,7 +1461,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
* @throws IgniteCheckedException If failed.
* @return Future completed when caches initialization is done.
*/
- private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
+ public IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut)
throws IgniteCheckedException {
final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b698bbfc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6a7258f..f6fa833 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1745,6 +1745,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Busy flag used as performance optimization to stop current preloading. */
private volatile boolean busy;
+ /** */
+ private boolean crd;
+
/**
* Constructor.
*/
@@ -1940,7 +1943,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
lastInitializedFut = exchFut;
- exchFut.init();
+ boolean newCrd = false;
+
+ if (!crd) {
+ List<ClusterNode> srvNodes = exchFut.discoCache().serverNodes();
+
+ crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
+ }
+
+ exchFut.init(newCrd);
int dumpCnt = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b698bbfc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 71e41b0..52a74ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -414,9 +414,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/**
* Starts activity.
*
+ * @param newCrd {@code True} if node become coordinator on this exchange.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- public void init() throws IgniteInterruptedCheckedException {
+ public void init(boolean newCrd) throws IgniteInterruptedCheckedException {
if (isDone())
return;
@@ -489,6 +490,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
initCachesOnLocalJoin();
}
+ if (newCrd) {
+ IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this);
+
+ if (fut != null)
+ fut.get();
+ }
+
exchange = CU.clientNode(discoEvt.eventNode()) ?
onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
[6/8] ignite git commit: IGNITE-5769 Abstract away .NET->Java calls
Posted by sb...@apache.org.
IGNITE-5769 Abstract away .NET->Java calls
This closes #2352
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89bba2fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89bba2fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89bba2fa
Branch: refs/heads/ignite-5578
Commit: 89bba2fa2c423d5713c8412ba0069b869005694c
Parents: 47fea40
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jul 28 10:06:16 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jul 28 10:06:16 2017 +0300
----------------------------------------------------------------------
.../platform/PlatformTargetProxy.java | 11 +
.../platform/PlatformTargetProxyImpl.java | 79 +-
.../plugin/PlatformTestPluginTarget.java | 7 +-
.../cpp/jni/include/ignite/jni/exports.h | 1 +
.../platforms/cpp/jni/include/ignite/jni/java.h | 5 +-
modules/platforms/cpp/jni/project/vs/module.def | 1 +
modules/platforms/cpp/jni/src/exports.cpp | 4 +
modules/platforms/cpp/jni/src/java.cpp | 19 +-
.../Plugin/PluginTest.cs | 13 +-
.../Apache.Ignite.Core.Tests/TestUtils.cs | 7 +-
.../Apache.Ignite.Core.csproj | 5 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 9 +-
.../Impl/Binary/BinaryProcessor.cs | 6 +-
.../Impl/Binary/BinaryWriterExtensions.cs | 107 ++
.../Cache/Affinity/PlatformAffinityFunction.cs | 7 +-
.../Impl/Cache/CacheAffinityImpl.cs | 18 +-
.../Impl/Cache/CacheEnumerator.cs | 8 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 113 +-
.../Impl/Cache/Query/AbstractQueryCursor.cs | 12 +-
.../Continuous/ContinuousQueryHandleImpl.cs | 12 +-
.../Impl/Cache/Query/FieldsQueryCursor.cs | 6 +-
.../Impl/Cache/Query/QueryCursor.cs | 5 +-
.../Impl/Cluster/ClusterGroupImpl.cs | 76 +-
.../Impl/Common/DelegateTypeDescriptor.cs | 9 +-
.../Impl/Common/Listenable.cs | 8 +-
.../Impl/Compute/ComputeImpl.cs | 12 +-
.../Impl/DataStructures/AtomicLong.cs | 9 +-
.../Impl/DataStructures/AtomicReference.cs | 8 +-
.../Impl/DataStructures/AtomicSequence.cs | 9 +-
.../Impl/Datastream/DataStreamerImpl.cs | 8 +-
.../Impl/Datastream/StreamReceiverHolder.cs | 13 +-
.../Apache.Ignite.Core/Impl/Events/Events.cs | 11 +-
.../Impl/IPlatformTargetInternal.cs | 102 ++
.../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 48 +-
.../Impl/Messaging/Messaging.cs | 10 +-
.../Impl/PlatformDisposableTargetAdapter.cs | 75 ++
.../Impl/PlatformJniTarget.cs | 536 +++++++++
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 1086 ------------------
.../Impl/PlatformTargetAdapter.cs | 534 +++++++++
.../Impl/Services/Services.cs | 19 +-
.../Impl/Transactions/TransactionsImpl.cs | 29 +-
.../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 10 +-
.../Impl/Unmanaged/UnmanagedUtils.cs | 7 +
.../Interop/IPlatformTarget.cs | 15 +
45 files changed, 1690 insertions(+), 1402 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
index 1ee57cb..29de311 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -105,6 +105,17 @@ public interface PlatformTargetProxy {
void inStreamAsync(int type, long memPtr) throws Exception;
/**
+ * Asynchronous operation accepting memory stream and returning PlatformListenableTarget.
+ * Supports cancellable async operations.
+ *
+ * @param type Operation type.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ * @throws Exception If case of failure.
+ */
+ Object inStreamOutObjectAsync(int type, long memPtr) throws Exception;
+
+ /**
* Returns the underlying target.
*
* @return Underlying target.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
index 44044b1..b472275 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget;
import org.apache.ignite.lang.IgniteFuture;
/**
@@ -109,37 +111,16 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
/** {@inheritDoc} */
@Override public void inStreamAsync(int type, long memPtr) throws Exception {
- try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
- BinaryRawReaderEx reader = platformCtx.reader(mem);
-
- long futId = reader.readLong();
- int futTyp = reader.readInt();
-
- final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
-
- if (res == null)
- throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
-
- IgniteFuture fut = res.future();
+ inStreamOutListenableAsync(type, memPtr);
+ }
- if (fut == null)
- throw new IgniteException("PlatformAsyncResult.future() should not return null.");
+ /** {@inheritDoc} */
+ @Override public Object inStreamOutObjectAsync(int type, long memPtr) throws Exception {
+ PlatformListenable listenable = inStreamOutListenableAsync(type, memPtr);
- PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
- /** {@inheritDoc} */
- @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
- res.write(writer, obj);
- }
+ PlatformListenableTarget target = new PlatformListenableTarget(listenable, platformCtx);
- /** {@inheritDoc} */
- @Override public boolean canWrite(Object obj, Throwable err) {
- return err == null;
- }
- }, target);
- }
- catch (Exception e) {
- throw target.convertException(e);
- }
+ return wrapProxy(target);
}
/** {@inheritDoc} */
@@ -234,4 +215,46 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
private PlatformTarget unwrapProxy(Object obj) {
return obj == null ? null : ((PlatformTargetProxyImpl)obj).target;
}
+
+ /**
+ * Performs asyncronous operation.
+ *
+ * @param type Type.
+ * @param memPtr Stream pointer.
+ * @return Listenable.
+ * @throws Exception On error.
+ */
+ private PlatformListenable inStreamOutListenableAsync(int type, long memPtr) throws Exception {
+ try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+ BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+ long futId = reader.readLong();
+ int futTyp = reader.readInt();
+
+ final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
+
+ if (res == null)
+ throw new IgniteException("PlatformTarget.processInStreamAsync should not return null.");
+
+ IgniteFuture fut = res.future();
+
+ if (fut == null)
+ throw new IgniteException("PlatformAsyncResult.future() should not return null.");
+
+ return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() {
+ /** {@inheritDoc} */
+ @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
+ res.write(writer, obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return err == null;
+ }
+ }, target);
+ }
+ catch (Exception e) {
+ throw target.convertException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
index 7e69425..8c1cbe9 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
@@ -146,7 +146,12 @@ class PlatformTestPluginTarget implements PlatformTarget {
case 1: {
// Async upper case.
final String val = reader.readString();
- final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+ final GridFutureAdapter<String> fa = new GridFutureAdapter<String>() {
+ @Override public boolean cancel() throws IgniteCheckedException {
+ return onCancelled();
+ }
+ };
new Thread(new Runnable() {
@Override public void run() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index ea0c32a..0580d19 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -38,6 +38,7 @@ extern "C" {
void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
+ void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr);
void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj);
void IGNITE_CALL IgniteRelease(void* obj);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index c170a5b..c713e81 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -173,9 +173,6 @@ namespace ignite
jmethodID m_PlatformIgnition_stop;
jmethodID m_PlatformIgnition_stopAll;
- jclass c_PlatformProcessor;
- jmethodID m_PlatformProcessor_releaseStart;
-
jclass c_PlatformTarget;
jmethodID m_PlatformTarget_inLongOutLong;
jmethodID m_PlatformTarget_inStreamOutLong;
@@ -183,6 +180,7 @@ namespace ignite
jmethodID m_PlatformTarget_outStream;
jmethodID m_PlatformTarget_outObject;
jmethodID m_PlatformTarget_inStreamAsync;
+ jmethodID m_PlatformTarget_inStreamOutObjectAsync;
jmethodID m_PlatformTarget_inStreamOutStream;
jmethodID m_PlatformTarget_inObjectStreamOutObjectStream;
@@ -325,6 +323,7 @@ namespace ignite
void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL);
void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
+ jobject TargetInStreamOutObjectAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index 53e7e42..1407f82 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -12,6 +12,7 @@ IgniteTargetInStreamOutStream @20
IgniteTargetInObjectStreamOutObjectStream @21
IgniteTargetInLongOutLong @24
IgniteTargetInStreamAsync @25
+IgniteTargetInStreamOutObjectAsync @26
IgniteAcquire @80
IgniteRelease @81
IgniteThrowToJava @82
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index 9b7defd..aeb68ab 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -74,6 +74,10 @@ extern "C" {
ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr);
}
+ void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr) {
+ return ctx->TargetInStreamOutObjectAsync(static_cast<jobject>(obj), opType, memPtr);
+ }
+
void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj) {
return ctx->Acquire(static_cast<jobject>(obj));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 7eadec0..ac4ba63 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -221,9 +221,6 @@ namespace ignite
const char* C_PLATFORM_NO_CALLBACK_EXCEPTION = "org/apache/ignite/internal/processors/platform/PlatformNoCallbackException";
- const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor";
- JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false);
-
const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy";
JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false);
@@ -233,6 +230,7 @@ namespace ignite
JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false);
JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false);
JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V", false);
+ JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC = JniMethod("inStreamOutObjectAsync", "(IJ)Ljava/lang/Object;", false);
const char* C_PLATFORM_CALLBACK_UTILS = "org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils";
@@ -449,9 +447,6 @@ namespace ignite
m_PlatformIgnition_stop = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP);
m_PlatformIgnition_stopAll = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP_ALL);
- c_PlatformProcessor = FindClass(env, C_PLATFORM_PROCESSOR);
- m_PlatformProcessor_releaseStart = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_RELEASE_START);
-
c_PlatformTarget = FindClass(env, C_PLATFORM_TARGET);
m_PlatformTarget_inLongOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_LONG_OUT_LONG);
m_PlatformTarget_inStreamOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_LONG);
@@ -461,6 +456,7 @@ namespace ignite
m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM);
m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM);
m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC);
+ m_PlatformTarget_inStreamOutObjectAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC);
c_PlatformUtils = FindClass(env, C_PLATFORM_UTILS);
m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
@@ -473,7 +469,6 @@ namespace ignite
void JniMembers::Destroy(JNIEnv* env) {
DeleteClass(env, c_IgniteException);
DeleteClass(env, c_PlatformIgnition);
- DeleteClass(env, c_PlatformProcessor);
DeleteClass(env, c_PlatformTarget);
DeleteClass(env, c_PlatformUtils);
}
@@ -894,6 +889,16 @@ namespace ignite
ExceptionCheck(env, err);
}
+ jobject JniContext::TargetInStreamOutObjectAsync(jobject obj, int opType, long long memPtr, JniErrorInfo* err) {
+ JNIEnv* env = Attach();
+
+ jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamOutObjectAsync, opType, memPtr);
+
+ ExceptionCheck(env, err);
+
+ return LocalToGlobal(env, res);
+ }
+
jobject JniContext::CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* err) {
JNIEnv* env = Attach();
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
index 00b1cca..1cb2fae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
@@ -21,6 +21,8 @@ namespace Apache.Ignite.Core.Tests.Plugin
using System.Collections.Generic;
using System.IO;
using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Interop;
@@ -142,13 +144,22 @@ namespace Apache.Ignite.Core.Tests.Plugin
Assert.IsTrue(task.IsCompleted);
Assert.AreEqual("FOO", asyncRes);
+ // Async operation with cancellation.
+ var cts = new CancellationTokenSource();
+ task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString(), cts.Token);
+ Assert.IsFalse(task.IsCompleted);
+ cts.Cancel();
+ Assert.IsTrue(task.IsCanceled);
+ var aex = Assert.Throws<AggregateException>(() => { asyncRes = task.Result; });
+ Assert.IsInstanceOf<TaskCanceledException>(aex.GetBaseException());
+
// Async operation with exception in entry point.
Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2, null, null));
// Async operation with exception in future.
var errTask = target.DoOutOpAsync<object>(3, null, null);
Assert.IsFalse(errTask.IsCompleted);
- var aex = Assert.Throws<AggregateException>(() => errTask.Wait());
+ aex = Assert.Throws<AggregateException>(() => errTask.Wait());
Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single());
// Throws custom mapped exception.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
index 6e0a497..4b171b0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs
@@ -365,12 +365,11 @@ namespace Apache.Ignite.Core.Tests
};
var proc = System.Diagnostics.Process.Start(procStart);
-
Assert.IsNotNull(proc);
- Console.WriteLine(proc.StandardOutput.ReadToEnd());
- Console.WriteLine(proc.StandardError.ReadToEnd());
- Assert.IsTrue(proc.WaitForExit(15000));
+ IgniteProcess.AttachProcessConsoleReader(proc);
+
+ Assert.IsTrue(proc.WaitForExit(19000));
Assert.AreEqual(0, proc.ExitCode);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 76132c3..c444ed0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -99,7 +99,10 @@
<Compile Include="Cache\IMemoryMetrics.cs" />
<Compile Include="Common\ExceptionFactory.cs" />
<Compile Include="Configuration\Package-Info.cs" />
+ <Compile Include="Impl\IPlatformTargetInternal.cs" />
<Compile Include="Impl\PersistentStore\PersistentStoreMetrics.cs" />
+ <Compile Include="Impl\PlatformDisposableTargetAdapter.cs" />
+ <Compile Include="Impl\PlatformJniTarget.cs" />
<Compile Include="PersistentStore\IPersistentStoreMetrics.cs" />
<Compile Include="PersistentStore\Package-Info.cs" />
<Compile Include="PersistentStore\PersistentStoreConfiguration.cs" />
@@ -385,7 +388,7 @@
<Compile Include="Impl\Ignite.cs" />
<Compile Include="Impl\IgniteManager.cs" />
<Compile Include="Impl\Log\JavaLogger.cs" />
- <Compile Include="Impl\PlatformTarget.cs" />
+ <Compile Include="Impl\PlatformTargetAdapter.cs" />
<Compile Include="Impl\IgniteUtils.cs" />
<Compile Include="Impl\Handle\Handle.cs" />
<Compile Include="Impl\Handle\HandleRegistry.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index 44ebef3..568eea7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -239,7 +239,7 @@ namespace Apache.Ignite.Core
// 3. Create startup object which will guide us through the rest of the process.
_startup = new Startup(cfg, cbs);
- IUnmanagedTarget interopProc = null;
+ PlatformJniTarget interopProc = null;
try
{
@@ -249,7 +249,7 @@ namespace Apache.Ignite.Core
// 5. At this point start routine is finished. We expect STARTUP object to have all necessary data.
var node = _startup.Ignite;
- interopProc = node.InteropProcessor;
+ interopProc = (PlatformJniTarget)node.InteropProcessor;
var javaLogger = log as JavaLogger;
if (javaLogger != null)
@@ -279,7 +279,7 @@ namespace Apache.Ignite.Core
// 2. Stop Ignite node if it was started.
if (interopProc != null)
- UU.IgnitionStop(interopProc.Context, gridName, true);
+ UU.IgnitionStop(interopProc.Target.Context, gridName, true);
// 3. Throw error further (use startup error if exists because it is more precise).
if (_startup.Error != null)
@@ -466,7 +466,8 @@ namespace Apache.Ignite.Core
if (Nodes.ContainsKey(new NodeKey(name)))
throw new IgniteException("Ignite with the same name already started: " + name);
- _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller,
+ _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name,
+ new PlatformJniTarget(interopProc, _startup.Marshaller), _startup.Marshaller,
_startup.LifecycleHandlers, _startup.Callbacks);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
index b8937c9..69056b3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs
@@ -21,12 +21,11 @@ namespace Apache.Ignite.Core.Impl.Binary
using System.Diagnostics;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary.Metadata;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Binary metadata processor, delegates to PlatformBinaryProcessor in Java.
/// </summary>
- internal class BinaryProcessor : PlatformTarget
+ internal class BinaryProcessor : PlatformTargetAdapter
{
/// <summary>
/// Op codes.
@@ -46,8 +45,7 @@ namespace Apache.Ignite.Core.Impl.Binary
/// Initializes a new instance of the <see cref="BinaryProcessor"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public BinaryProcessor(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public BinaryProcessor(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
index 64bfa35..3dc8a96 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs
@@ -18,6 +18,8 @@
namespace Apache.Ignite.Core.Impl.Binary
{
using System;
+ using System.Collections.Generic;
+ using System.IO;
using Apache.Ignite.Core.Binary;
/// <summary>
@@ -75,5 +77,110 @@ namespace Apache.Ignite.Core.Impl.Binary
writer.WriteBoolean(false);
}
+ /// <summary>
+ /// Write collection.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <param name="selector">A transform function to apply to each element.</param>
+ /// <returns>The same writer for chaining.</returns>
+ private static void WriteCollection<T1, T2>(this BinaryWriter writer, ICollection<T1> vals,
+ Func<T1, T2> selector)
+ {
+ writer.WriteInt(vals.Count);
+
+ if (selector == null)
+ {
+ foreach (var val in vals)
+ writer.Write(val);
+ }
+ else
+ {
+ foreach (var val in vals)
+ writer.Write(selector(val));
+ }
+ }
+
+ /// <summary>
+ /// Write enumerable.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <returns>The same writer for chaining.</returns>
+ public static void WriteEnumerable<T>(this BinaryWriter writer, IEnumerable<T> vals)
+ {
+ WriteEnumerable<T, T>(writer, vals, null);
+ }
+
+ /// <summary>
+ /// Write enumerable.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ /// <param name="selector">A transform function to apply to each element.</param>
+ /// <returns>The same writer for chaining.</returns>
+ public static void WriteEnumerable<T1, T2>(this BinaryWriter writer, IEnumerable<T1> vals,
+ Func<T1, T2> selector)
+ {
+ var col = vals as ICollection<T1>;
+
+ if (col != null)
+ {
+ WriteCollection(writer, col, selector);
+ return;
+ }
+
+ var stream = writer.Stream;
+
+ var pos = stream.Position;
+
+ stream.Seek(4, SeekOrigin.Current);
+
+ var size = 0;
+
+ if (selector == null)
+ {
+ foreach (var val in vals)
+ {
+ writer.Write(val);
+
+ size++;
+ }
+ }
+ else
+ {
+ foreach (var val in vals)
+ {
+ writer.Write(selector(val));
+
+ size++;
+ }
+ }
+
+ stream.WriteInt(pos, size);
+ }
+
+ /// <summary>
+ /// Write dictionary.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ /// <param name="vals">Values.</param>
+ public static void WriteDictionary<T1, T2>(this BinaryWriter writer, IEnumerable<KeyValuePair<T1, T2>> vals)
+ {
+ var pos = writer.Stream.Position;
+ writer.WriteInt(0); // Reserve count.
+
+ int cnt = 0;
+
+ foreach (var pair in vals)
+ {
+ writer.Write(pair.Key);
+ writer.Write(pair.Value);
+
+ cnt++;
+ }
+
+ writer.Stream.WriteInt(pos, cnt);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
index d335804..08c31a6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
@@ -21,13 +21,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity
using System.Collections.Generic;
using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Affinity function that delegates to Java.
/// </summary>
- internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction
+ internal class PlatformAffinityFunction : PlatformTargetAdapter, IAffinityFunction
{
/** Opcodes. */
private enum Op
@@ -41,8 +39,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity
/// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public PlatformAffinityFunction(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
index f09a119..a2bba29 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs
@@ -19,19 +19,16 @@ namespace Apache.Ignite.Core.Impl.Cache
{
using System;
using System.Collections.Generic;
- using System.Diagnostics;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Cache affinity implementation.
/// </summary>
- internal class CacheAffinityImpl : PlatformTarget, ICacheAffinity
+ internal class CacheAffinityImpl : PlatformTargetAdapter, ICacheAffinity
{
/** */
private const int OpAffinityKey = 1;
@@ -88,17 +85,12 @@ namespace Apache.Ignite.Core.Impl.Cache
/// Initializes a new instance of the <see cref="CacheAffinityImpl" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- /// <param name="ignite">Grid.</param>
- public CacheAffinityImpl(IUnmanagedTarget target, Marshaller marsh, bool keepBinary,
- Ignite ignite) : base(target, marsh)
+ public CacheAffinityImpl(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
- Debug.Assert(ignite != null);
-
- _ignite = ignite;
+ _ignite = target.Marshaller.Ignite;
}
/** <inheritDoc /> */
@@ -182,7 +174,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOp(OpMapKeysToNodes, w => WriteEnumerable(w, keys),
+ return DoOutInOp(OpMapKeysToNodes, w => w.WriteEnumerable(keys),
reader => ReadDictionary(reader, ReadNode, r => (IList<TK>) r.ReadCollectionAsList<TK>()));
}
@@ -214,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(parts, "parts");
return DoOutInOp(OpMapPartitionsToNodes,
- w => WriteEnumerable(w, parts),
+ w => w.WriteEnumerable(parts),
reader => ReadDictionary(reader, r => r.ReadInt(), ReadNode));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
index e2b8350..2860bb6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs
@@ -21,14 +21,12 @@ namespace Apache.Ignite.Core.Impl.Cache
using System.Collections;
using System.Collections.Generic;
using Apache.Ignite.Core.Cache;
- using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Real cache enumerator communicating with Java.
/// </summary>
- internal class CacheEnumerator<TK, TV> : PlatformDisposableTarget, IEnumerator<ICacheEntry<TK, TV>>
+ internal class CacheEnumerator<TK, TV> : PlatformDisposableTargetAdapter, IEnumerator<ICacheEntry<TK, TV>>
{
/** Operation: next value. */
private const int OpNext = 1;
@@ -43,10 +41,8 @@ namespace Apache.Ignite.Core.Impl.Cache
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- public CacheEnumerator(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) :
- base(target, marsh)
+ public CacheEnumerator(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index e6b2408..5789c8f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache
using System;
using System.Collections;
using System.Collections.Generic;
- using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
@@ -38,13 +37,12 @@ namespace Apache.Ignite.Core.Impl.Cache
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Transactions;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Native cache wrapper.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class CacheImpl<TK, TV> : PlatformTarget, ICache<TK, TV>, ICacheInternal, ICacheLockInternal
+ internal class CacheImpl<TK, TV> : PlatformTargetAdapter, ICache<TK, TV>, ICacheInternal, ICacheLockInternal
{
/** Ignite instance. */
private readonly Ignite _ignite;
@@ -64,31 +62,32 @@ namespace Apache.Ignite.Core.Impl.Cache
/** Transaction manager. */
private readonly CacheTransactionManager _txManager;
+ /** Pre-allocated delegate. */
+ private readonly Func<IBinaryStream, Exception> _readException;
+
/// <summary>
/// Constructor.
/// </summary>
- /// <param name="grid">Grid.</param>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="flagSkipStore">Skip store flag.</param>
/// <param name="flagKeepBinary">Keep binary flag.</param>
/// <param name="flagNoRetries">No-retries mode flag.</param>
/// <param name="flagPartitionRecover">Partition recover mode flag.</param>
- public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh,
+ public CacheImpl(IPlatformTargetInternal target,
bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries, bool flagPartitionRecover)
- : base(target, marsh)
+ : base(target)
{
- Debug.Assert(grid != null);
-
- _ignite = grid;
+ _ignite = target.Marshaller.Ignite;
_flagSkipStore = flagSkipStore;
_flagKeepBinary = flagKeepBinary;
_flagNoRetries = flagNoRetries;
_flagPartitionRecover = flagPartitionRecover;
_txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional
- ? new CacheTransactionManager(grid.GetTransactions())
+ ? new CacheTransactionManager(_ignite.GetTransactions())
: null;
+
+ _readException = stream => ReadException(Marshaller.StartUnmarshal(stream));
}
/** <inheritDoc /> */
@@ -172,7 +171,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagSkipStore)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithSkipStore), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithSkipStore),
true, _flagKeepBinary, true, _flagPartitionRecover);
}
@@ -196,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return result;
}
- return new CacheImpl<TK1, TV1>(_ignite, DoOutOpObject((int) CacheOp.WithKeepBinary), Marshaller,
+ return new CacheImpl<TK1, TV1>(DoOutOpObject((int) CacheOp.WithKeepBinary),
_flagSkipStore, true, _flagNoRetries, _flagPartitionRecover);
}
@@ -207,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache
var cache0 = DoOutOpObject((int)CacheOp.WithExpiryPolicy, w => ExpiryPolicySerializer.WritePolicy(w, plc));
- return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary,
+ return new CacheImpl<TK, TV>(cache0, _flagSkipStore, _flagKeepBinary,
_flagNoRetries, _flagPartitionRecover);
}
@@ -220,7 +219,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
{
- DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
+ DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), _readException);
}
/** <inheritDoc /> */
@@ -232,7 +231,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
{
- DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
+ DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), _readException);
}
/** <inheritDoc /> */
@@ -281,7 +280,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutOpAsync(CacheOp.LoadAll, writer =>
{
writer.WriteBoolean(replaceExistingValues);
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
});
}
@@ -306,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys));
+ return DoOutOp(CacheOp.ContainsKeys, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -314,7 +313,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -342,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.WriteInt(EncodePeekModes(modes));
},
(s, r) => r == True ? new CacheResult<TV>(Unmarshal<TV>(s)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
value = res.Success ? res.Value : default(TV);
@@ -375,7 +374,7 @@ namespace Apache.Ignite.Core.Impl.Cache
throw GetKeyNotFoundException();
return Unmarshal<TV>(stream);
- }, ReadException);
+ }, _readException);
}
/** <inheritDoc /> */
@@ -418,9 +417,9 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(keys, "keys");
return DoOutInOpX((int) CacheOp.GetAll,
- writer => WriteEnumerable(writer, keys),
+ writer => writer.WriteEnumerable(keys),
(s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null,
- ReadException);
+ _readException);
}
/** <inheritDoc /> */
@@ -428,7 +427,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(CacheOp.GetAllAsync, w => WriteEnumerable(w, keys), r => ReadGetAllDictionary(r));
+ return DoOutOpAsync(CacheOp.GetAllAsync, w => w.WriteEnumerable(keys), r => ReadGetAllDictionary(r));
}
/** <inheritdoc /> */
@@ -631,7 +630,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals));
+ DoOutOp(CacheOp.PutAll, writer => writer.WriteDictionary(vals));
}
/** <inheritDoc /> */
@@ -641,7 +640,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals));
+ return DoOutOpAsync(CacheOp.PutAllAsync, writer => writer.WriteDictionary(vals));
}
/** <inheritdoc /> */
@@ -649,7 +648,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocEvict, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -685,7 +684,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.ClearAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -693,7 +692,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutOpAsync(CacheOp.ClearAllAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync(CacheOp.ClearAllAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -709,7 +708,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.LocalClearAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritdoc /> */
@@ -761,7 +760,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
+ DoOutOp(CacheOp.RemoveAll, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -771,7 +770,7 @@ namespace Apache.Ignite.Core.Impl.Cache
StartTx();
- return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys));
+ return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => writer.WriteEnumerable(keys));
}
/** <inheritDoc /> */
@@ -843,7 +842,7 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.Write(holder);
},
(input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
- ReadException);
+ _readException);
}
/** <inheritDoc /> */
@@ -891,10 +890,12 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int) CacheOp.InvokeAll,
writer =>
{
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
writer.Write(holder);
},
- (input, res) => res == True ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)): null, ReadException);
+ (input, res) => res == True
+ ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
+ : null, _readException);
}
/** <inheritDoc /> */
@@ -912,7 +913,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutOpAsync(CacheOp.InvokeAllAsync,
writer =>
{
- WriteEnumerable(writer, keys);
+ writer.WriteEnumerable(keys);
writer.Write(holder);
},
input => ReadInvokeAllResults<TRes>(input));
@@ -931,7 +932,7 @@ namespace Apache.Ignite.Core.Impl.Cache
},
(input, res) => res == True
? readFunc(Marshaller.StartUnmarshal(input))
- : default(T), ReadException);
+ : default(T), _readException);
}
/** <inheritdoc /> */
@@ -940,7 +941,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key),
- (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException);
+ (stream, res) => new CacheLock(stream.ReadInt(), this), _readException);
}
/** <inheritdoc /> */
@@ -948,8 +949,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys),
- (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException);
+ return DoOutInOpX((int) CacheOp.LockAll, w => w.WriteEnumerable(keys),
+ (stream, res) => new CacheLock(stream.ReadInt(), this), _readException);
}
/** <inheritdoc /> */
@@ -1011,7 +1012,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagNoRetries)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithNoRetries), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithNoRetries),
_flagSkipStore, _flagKeepBinary, true, _flagPartitionRecover);
}
@@ -1021,7 +1022,7 @@ namespace Apache.Ignite.Core.Impl.Cache
if (_flagPartitionRecover)
return this;
- return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithPartitionRecover), Marshaller,
+ return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithPartitionRecover),
_flagSkipStore, _flagKeepBinary, _flagNoRetries, true);
}
@@ -1092,7 +1093,7 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.WriteString(qry.Schema); // Schema
});
- return new FieldsQueryCursor<T>(cursor, Marshaller, _flagKeepBinary, readerFunc);
+ return new FieldsQueryCursor<T>(cursor, _flagKeepBinary, readerFunc);
}
/** <inheritDoc /> */
@@ -1102,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Cache
var cursor = DoOutOpObject((int) qry.OpId, writer => qry.Write(writer, IsKeepBinary));
- return new QueryCursor<TK, TV>(cursor, Marshaller, _flagKeepBinary);
+ return new QueryCursor<TK, TV>(cursor, _flagKeepBinary);
}
/** <inheritdoc /> */
@@ -1168,10 +1169,10 @@ namespace Apache.Ignite.Core.Impl.Cache
{
var target = DoOutOpObject((int) CacheOp.LocIterator, (IBinaryStream s) => s.WriteInt(peekModes));
- return new CacheEnumerator<TK, TV>(target, Marshaller, _flagKeepBinary);
+ return new CacheEnumerator<TK, TV>(target, _flagKeepBinary);
}
- return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), Marshaller, _flagKeepBinary);
+ return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), _flagKeepBinary);
}
#endregion
@@ -1228,14 +1229,6 @@ namespace Apache.Ignite.Core.Impl.Cache
}
/// <summary>
- /// Reads the exception.
- /// </summary>
- private Exception ReadException(IBinaryStream stream)
- {
- return ReadException(Marshaller.StartUnmarshal(stream));
- }
-
- /// <summary>
/// Reads the exception, either in binary wrapper form, or as a pair of strings.
/// </summary>
/// <param name="reader">The stream.</param>
@@ -1315,7 +1308,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int) op, w =>
{
w.Write(x);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1327,7 +1320,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
w.Write(x);
w.Write(y);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1340,7 +1333,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.Write(x);
w.Write(y);
w.Write(z);
- }, ReadException);
+ }, _readException);
}
/// <summary>
@@ -1348,7 +1341,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/// </summary>
private bool DoOutOp(CacheOp op, Action<BinaryWriter> write)
{
- return DoOutInOpX((int) op, write, ReadException);
+ return DoOutInOpX((int) op, write, _readException);
}
/// <summary>
@@ -1359,7 +1352,7 @@ namespace Apache.Ignite.Core.Impl.Cache
return DoOutInOpX((int)cacheOp,
w => w.Write(x),
(stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
}
/// <summary>
@@ -1374,7 +1367,7 @@ namespace Apache.Ignite.Core.Impl.Cache
w.Write(y);
},
(stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(),
- ReadException);
+ _readException);
}
/** <inheritdoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
index 95c6a36..8e4985e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs
@@ -23,13 +23,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Abstract query cursor implementation.
/// </summary>
- internal abstract class AbstractQueryCursor<T> : PlatformDisposableTarget, IQueryCursor<T>, IEnumerator<T>
+ internal abstract class AbstractQueryCursor<T> : PlatformDisposableTargetAdapter, IQueryCursor<T>, IEnumerator<T>
{
/** */
private const int OpGetAll = 1;
@@ -65,10 +63,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="keepBinary">Keep binary flag.</param>
- protected AbstractQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) :
- base(target, marsh)
+ protected AbstractQueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target)
{
_keepBinary = keepBinary;
}
@@ -88,7 +84,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
throw new InvalidOperationException("Failed to get all entries because GetAll() " +
"method has already been called.");
- var res = DoInOp<IList<T>>(OpGetAll, ConvertGetAll);
+ var res = DoInOp(OpGetAll, ConvertGetAll);
_getAllCalled = true;
@@ -216,7 +212,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// </summary>
private void RequestBatch()
{
- _batch = DoInOp<T[]>(OpGetBatch, ConvertGetBatch);
+ _batch = DoInOp(OpGetBatch, ConvertGetBatch);
_batchPos = 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
index 6139d8b..ff5c434 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs
@@ -28,8 +28,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Resource;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
using CQU = ContinuousQueryUtils;
/// <summary>
@@ -67,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
private readonly long _hnd;
/** Native query. */
- private readonly IUnmanagedTarget _nativeQry;
+ private readonly IPlatformTargetInternal _nativeQry;
/** Initial query cursor. */
private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor;
@@ -84,7 +82,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
/// <param name="createTargetCb">The initialization callback.</param>
/// <param name="initialQry">The initial query.</param>
public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, Marshaller marsh, bool keepBinary,
- Func<Action<BinaryWriter>, IUnmanagedTarget> createTargetCb, QueryBase initialQry)
+ Func<Action<BinaryWriter>, IPlatformTargetInternal> createTargetCb, QueryBase initialQry)
{
_marsh = marsh;
_keepBinary = keepBinary;
@@ -138,10 +136,10 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
});
// 4. Initial query.
- var nativeInitialQryCur = UU.TargetOutObject(_nativeQry, 0);
+ var nativeInitialQryCur = _nativeQry.OutObjectInternal(0);
_initialQueryCursor = nativeInitialQryCur == null
? null
- : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepBinary);
+ : new QueryCursor<TK, TV>(nativeInitialQryCur, _keepBinary);
}
catch (Exception)
{
@@ -225,7 +223,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous
try
{
- UU.TargetInLongOutLong(_nativeQry, 0, 0);
+ _nativeQry.InLongOutLong(0, 0);
}
finally
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
index d928418..9d021dc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using System.Diagnostics.CodeAnalysis;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Cursor for entry-based queries.
@@ -36,12 +35,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaler.</param>
/// <param name="keepBinary">Keep poratble flag.</param>
/// <param name="readerFunc">The reader function.</param>
- public FieldsQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary,
+ public FieldsQueryCursor(IPlatformTargetInternal target, bool keepBinary,
Func<IBinaryRawReader, int, T> readerFunc)
- : base(target, marsh, keepBinary)
+ : base(target, keepBinary)
{
Debug.Assert(readerFunc != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
index 5a46915..bc3cdb6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
using System.Diagnostics.CodeAnalysis;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Cursor for entry-based queries.
@@ -31,10 +30,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaler.</param>
/// <param name="keepBinary">Keep poratble flag.</param>
- public QueryCursor(IUnmanagedTarget target, Marshaller marsh,
- bool keepBinary) : base(target, marsh, keepBinary)
+ public QueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target, keepBinary)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
index 30afe57..678fb03 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
@@ -37,16 +37,14 @@ namespace Apache.Ignite.Core.Impl.Cluster
using Apache.Ignite.Core.Impl.Messaging;
using Apache.Ignite.Core.Impl.PersistentStore;
using Apache.Ignite.Core.Impl.Services;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
using Apache.Ignite.Core.PersistentStore;
using Apache.Ignite.Core.Services;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Ignite projection implementation.
/// </summary>
- internal class ClusterGroupImpl : PlatformTarget, IClusterGroup
+ internal class ClusterGroupImpl : PlatformTargetAdapter, IClusterGroup
{
/** Attribute: platform. */
private const string AttrPlatform = "org.apache.ignite.platform";
@@ -175,13 +173,12 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="ignite">Grid.</param>
/// <param name="pred">Predicate.</param>
[SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")]
- public ClusterGroupImpl(IUnmanagedTarget target, Ignite ignite, Func<IClusterNode, bool> pred)
- : base(target, ignite.Marshaller)
+ public ClusterGroupImpl(IPlatformTargetInternal target, Func<IClusterNode, bool> pred)
+ : base(target)
{
- _ignite = ignite;
+ _ignite = target.Marshaller.Ignite;
_pred = pred;
_comp = new Lazy<ICompute>(() => CreateCompute());
@@ -207,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private ICompute CreateCompute()
{
- return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), Marshaller, this, false));
+ return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), this, false));
}
/** <inheritDoc /> */
@@ -252,10 +249,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
Debug.Assert(items != null);
- IUnmanagedTarget prj = DoOutOpObject(OpForNodeIds, writer =>
- {
- WriteEnumerable(writer, items, func);
- });
+ var prj = DoOutOpObject(OpForNodeIds, writer => writer.WriteEnumerable(items, func));
return GetClusterGroup(prj);
}
@@ -265,7 +259,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
var newPred = _pred == null ? p : node => _pred(node) && p(node);
- return new ClusterGroupImpl(Target, _ignite, newPred);
+ return new ClusterGroupImpl(Target, newPred);
}
/** <inheritDoc /> */
@@ -278,7 +272,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
writer.WriteString(name);
writer.WriteString(val);
};
- IUnmanagedTarget prj = DoOutOpObject(OpForAttribute, action);
+ var prj = DoOutOpObject(OpForAttribute, action);
return GetClusterGroup(prj);
}
@@ -293,7 +287,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </returns>
private IClusterGroup ForCacheNodes(string name, int op)
{
- IUnmanagedTarget prj = DoOutOpObject(op, writer =>
+ var prj = DoOutOpObject(op, writer =>
{
writer.WriteString(name);
});
@@ -336,7 +330,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
IgniteArgumentCheck.NotNull(node, "node");
- IUnmanagedTarget prj = DoOutOpObject(OpForHost, writer =>
+ var prj = DoOutOpObject(OpForHost, writer =>
{
writer.WriteGuid(node.Id);
});
@@ -404,15 +398,14 @@ namespace Apache.Ignite.Core.Impl.Cluster
return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
});
}
- return DoOutInOp(OpMetricsFiltered, writer =>
- {
- WriteEnumerable(writer, GetNodes().Select(node => node.Id));
- }, stream =>
- {
- IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false);
+ return DoOutInOp(OpMetricsFiltered,
+ writer => writer.WriteEnumerable(GetNodes().Select(node => node.Id)),
+ stream =>
+ {
+ IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false);
- return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
- });
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
}
/** <inheritDoc /> */
@@ -426,7 +419,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IMessaging CreateMessaging()
{
- return new Messaging(DoOutOpObject(OpGetMessaging), Marshaller, this);
+ return new Messaging(DoOutOpObject(OpGetMessaging), this);
}
/** <inheritDoc /> */
@@ -440,7 +433,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IEvents CreateEvents()
{
- return new Events(DoOutOpObject(OpGetEvents), Marshaller, this);
+ return new Events(DoOutOpObject(OpGetEvents), this);
}
/** <inheritDoc /> */
@@ -454,7 +447,7 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
private IServices CreateServices()
{
- return new Services(DoOutOpObject(OpGetServices), Marshaller, this, false, false);
+ return new Services(DoOutOpObject(OpGetServices), this, false, false);
}
/// <summary>
@@ -665,9 +658,9 @@ namespace Apache.Ignite.Core.Impl.Cluster
/// </summary>
/// <param name="prj">Native projection.</param>
/// <returns>New cluster group.</returns>
- private IClusterGroup GetClusterGroup(IUnmanagedTarget prj)
+ private IClusterGroup GetClusterGroup(IPlatformTargetInternal prj)
{
- return new ClusterGroupImpl(prj, _ignite, _pred);
+ return new ClusterGroupImpl(prj, _pred);
}
/// <summary>
@@ -678,29 +671,30 @@ namespace Apache.Ignite.Core.Impl.Cluster
{
long oldTopVer = Interlocked.Read(ref _topVer);
- List<IClusterNode> newNodes = null;
-
- DoOutInOp(OpNodes, writer =>
+ var res = Target.InStreamOutStream(OpNodes, writer =>
{
writer.WriteLong(oldTopVer);
- }, input =>
+ }, reader =>
{
- BinaryReader reader = Marshaller.StartUnmarshal(input);
-
if (reader.ReadBoolean())
{
// Topology has been updated.
long newTopVer = reader.ReadLong();
+ var newNodes = IgniteUtils.ReadNodes((BinaryReader) reader, _pred);
- newNodes = IgniteUtils.ReadNodes(reader, _pred);
-
- UpdateTopology(newTopVer, newNodes);
+ return Tuple.Create(newTopVer, newNodes);
}
+
+ return null;
});
- if (newNodes != null)
- return newNodes;
-
+ if (res != null)
+ {
+ UpdateTopology(res.Item1, res.Item2);
+
+ return res.Item2;
+ }
+
// No topology changes.
Debug.Assert(_nodes != null, "At least one topology update should have occurred.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
index 4cd0678..cc12caa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -28,7 +28,6 @@ namespace Apache.Ignite.Core.Impl.Common
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
using Apache.Ignite.Core.Impl.Datastream;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Messaging;
/// <summary>
@@ -66,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Common
private readonly Action<object> _computeJobCancel;
/** */
- private readonly Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> _streamReceiver;
+ private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _streamReceiver;
/** */
private readonly Func<object, object> _streamTransformerCtor;
@@ -163,7 +162,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// </summary>
/// <param name="type">Type.</param>
/// <returns>Precompiled invocator delegate.</returns>
- public static Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> GetStreamReceiver(Type type)
+ public static Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> GetStreamReceiver(Type type)
{
return Get(type)._streamReceiver;
}
@@ -313,12 +312,12 @@ namespace Apache.Ignite.Core.Impl.Common
.MakeGenericMethod(iface.GetGenericArguments());
_streamReceiver = DelegateConverter
- .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool>>(
+ .CompileFunc<Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool>>(
typeof (StreamReceiverHolder),
method,
new[]
{
- iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IBinaryStream),
+ iface, typeof (Ignite), typeof (IPlatformTargetInternal), typeof (IBinaryStream),
typeof (bool)
},
new[] {true, false, false, false, false, false});
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
index 6da98ab..8566d0b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs
@@ -17,13 +17,10 @@
namespace Apache.Ignite.Core.Impl.Common
{
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
-
/// <summary>
/// Platform listenable.
/// </summary>
- internal class Listenable : PlatformTarget
+ internal class Listenable : PlatformTargetAdapter
{
/** */
private const int OpCancel = 1;
@@ -32,8 +29,7 @@ namespace Apache.Ignite.Core.Impl.Common
/// Initializes a new instance of the <see cref="Listenable"/> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
- public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ public Listenable(IPlatformTargetInternal target) : base(target)
{
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index cace7b2..06f9ad4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -34,13 +34,12 @@ namespace Apache.Ignite.Core.Impl.Compute
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Compute.Closure;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Compute implementation.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
- internal class ComputeImpl : PlatformTarget
+ internal class ComputeImpl : PlatformTargetAdapter
{
/** */
private const int OpAffinity = 1;
@@ -76,11 +75,10 @@ namespace Apache.Ignite.Core.Impl.Compute
/// Constructor.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="prj">Projection.</param>
/// <param name="keepBinary">Binary flag.</param>
- public ComputeImpl(IUnmanagedTarget target, Marshaller marsh, ClusterGroupImpl prj, bool keepBinary)
- : base(target, marsh)
+ public ComputeImpl(IPlatformTargetInternal target, ClusterGroupImpl prj, bool keepBinary)
+ : base(target)
{
_prj = prj;
@@ -194,7 +192,7 @@ namespace Apache.Ignite.Core.Impl.Compute
var future = holder.Future;
- future.SetTarget(new Listenable(futTarget, Marshaller));
+ future.SetTarget(new Listenable(futTarget));
return future;
}
@@ -551,7 +549,7 @@ namespace Apache.Ignite.Core.Impl.Compute
writeAction(writer);
});
- holder.Future.SetTarget(new Listenable(futTarget, Marshaller));
+ holder.Future.SetTarget(new Listenable(futTarget));
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
index 0c4bf84..f797408 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs
@@ -19,16 +19,12 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
- using Apache.Ignite.Core.Impl.Unmanaged;
-
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Atomic long wrapper.
/// </summary>
- internal sealed class AtomicLong : PlatformTarget, IAtomicLong
+ internal sealed class AtomicLong : PlatformTargetAdapter, IAtomicLong
{
/** */
private readonly string _name;
@@ -50,9 +46,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures
/// Initializes a new instance of the <see cref="AtomicLong"/> class.
/// </summary>
/// <param name="target">The target.</param>
- /// <param name="marsh">The marshaller.</param>
/// <param name="name">The name.</param>
- public AtomicLong(IUnmanagedTarget target, Marshaller marsh, string name) : base(target, marsh)
+ public AtomicLong(IPlatformTargetInternal target, string name) : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
index 4ca4b24..76515a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs
@@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Atomic reference.
/// </summary>
- internal class AtomicReference<T> : PlatformTarget, IAtomicReference<T>
+ internal class AtomicReference<T> : PlatformTargetAdapter, IAtomicReference<T>
{
/** Opcodes. */
private enum Op
@@ -41,8 +39,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures
private readonly string _name;
/** <inheritDoc /> */
- public AtomicReference(IUnmanagedTarget target, Marshaller marsh, string name)
- : base(target, marsh)
+ public AtomicReference(IPlatformTargetInternal target, string name)
+ : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
index f7fc6b7..dd079ef 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs
@@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures
{
using System.Diagnostics;
using Apache.Ignite.Core.DataStructures;
- using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
/// Atomic long wrapper.
/// </summary>
- internal sealed class AtomicSequence: PlatformTarget, IAtomicSequence
+ internal sealed class AtomicSequence: PlatformTargetAdapter, IAtomicSequence
{
/** */
private readonly string _name;
@@ -46,10 +44,9 @@ namespace Apache.Ignite.Core.Impl.DataStructures
/// Initializes a new instance of the <see cref="Apache.Ignite.Core.Impl.DataStructures.AtomicLong"/> class.
/// </summary>
/// <param name="target">The target.</param>
- /// <param name="marsh">The marshaller.</param>
/// <param name="name">The name.</param>
- public AtomicSequence(IUnmanagedTarget target, Marshaller marsh, string name)
- : base(target, marsh)
+ public AtomicSequence(IPlatformTargetInternal target, string name)
+ : base(target)
{
Debug.Assert(!string.IsNullOrEmpty(name));
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
index 96e58d4..fb2df01 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs
@@ -26,8 +26,6 @@ namespace Apache.Ignite.Core.Impl.Datastream
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Data streamer internal interface to get rid of generics.
@@ -45,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <summary>
/// Data streamer implementation.
/// </summary>
- internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV>
+ internal class DataStreamerImpl<TK, TV> : PlatformDisposableTargetAdapter, IDataStreamer, IDataStreamer<TK, TV>
{
#pragma warning disable 0420
@@ -141,8 +139,8 @@ namespace Apache.Ignite.Core.Impl.Datastream
/// <param name="marsh">Marshaller.</param>
/// <param name="cacheName">Cache name.</param>
/// <param name="keepBinary">Binary flag.</param>
- public DataStreamerImpl(IUnmanagedTarget target, Marshaller marsh, string cacheName, bool keepBinary)
- : base(target, marsh)
+ public DataStreamerImpl(IPlatformTargetInternal target, Marshaller marsh, string cacheName, bool keepBinary)
+ : base(target)
{
_cacheName = cacheName;
_keepBinary = keepBinary;
[4/8] ignite git commit: IGNITE-5769 Abstract away .NET->Java calls
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTargetAdapter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTargetAdapter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTargetAdapter.cs
new file mode 100644
index 0000000..64b5f29
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTargetAdapter.cs
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading.Tasks;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Common;
+ using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
+ using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
+
+ /// <summary>
+ /// Base class for interop targets, provides additional functionality over <see cref="IPlatformTargetInternal"/>.
+ /// </summary>
+ [SuppressMessage("ReSharper", "LocalVariableHidesMember")]
+ internal class PlatformTargetAdapter
+ {
+ /** */
+ internal const int False = 0;
+
+ /** */
+ internal const int True = 1;
+
+ /** */
+ internal const int Error = -1;
+
+ /** */
+ private static readonly Dictionary<Type, FutureType> IgniteFutureTypeMap
+ = new Dictionary<Type, FutureType>
+ {
+ {typeof(bool), FutureType.Bool},
+ {typeof(byte), FutureType.Byte},
+ {typeof(char), FutureType.Char},
+ {typeof(double), FutureType.Double},
+ {typeof(float), FutureType.Float},
+ {typeof(int), FutureType.Int},
+ {typeof(long), FutureType.Long},
+ {typeof(short), FutureType.Short}
+ };
+
+ /** Unmanaged target. */
+ private readonly IPlatformTargetInternal _target;
+
+ /** Marshaller. */
+ private readonly Marshaller _marsh;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ protected PlatformTargetAdapter(IPlatformTargetInternal target)
+ {
+ Debug.Assert(target != null);
+
+ _target = target;
+ _marsh = target.Marshaller;
+ }
+
+ /// <summary>
+ /// Unmanaged target.
+ /// </summary>
+ internal IPlatformTargetInternal Target
+ {
+ get { return _target; }
+ }
+
+ /// <summary>
+ /// Marshaller.
+ /// </summary>
+ internal Marshaller Marshaller
+ {
+ get { return _marsh; }
+ }
+
+ #region OUT operations
+
+ /// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action to be performed on the stream.</param>
+ /// <returns></returns>
+ protected long DoOutOp(int type, Action<IBinaryStream> action)
+ {
+ return _target.InStreamOutLong(type, action);
+ }
+
+ /// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action to be performed on the stream.</param>
+ /// <returns></returns>
+ protected long DoOutOp(int type, Action<BinaryWriter> action)
+ {
+ return DoOutOp(type, stream => WriteToStream(action, stream, _marsh));
+ }
+
+ /// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action to be performed on the stream.</param>
+ /// <returns>Resulting object.</returns>
+ protected IPlatformTargetInternal DoOutOpObject(int type, Action<BinaryWriter> action)
+ {
+ return _target.InStreamOutObject(type, stream => WriteToStream(action, stream, _marsh));
+ }
+
+ /// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action to be performed on the stream.</param>
+ /// <returns>Resulting object.</returns>
+ protected IPlatformTargetInternal DoOutOpObject(int type, Action<IBinaryStream> action)
+ {
+ return _target.InStreamOutObject(type, action);
+ }
+
+ /// <summary>
+ /// Perform out operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <returns>Resulting object.</returns>
+ protected IPlatformTargetInternal DoOutOpObject(int type)
+ {
+ return _target.OutObjectInternal(type);
+ }
+
+ /// <summary>
+ /// Perform simple output operation accepting single argument.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="val1">Value.</param>
+ /// <returns>Result.</returns>
+ protected long DoOutOp<T1>(int type, T1 val1)
+ {
+ return DoOutOp(type, writer =>
+ {
+ writer.Write(val1);
+ });
+ }
+
+ /// <summary>
+ /// Perform simple output operation accepting two arguments.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="val1">Value 1.</param>
+ /// <param name="val2">Value 2.</param>
+ /// <returns>Result.</returns>
+ protected long DoOutOp<T1, T2>(int type, T1 val1, T2 val2)
+ {
+ return DoOutOp(type, writer =>
+ {
+ writer.Write(val1);
+ writer.Write(val2);
+ });
+ }
+
+ #endregion
+
+ #region IN operations
+
+ /// <summary>
+ /// Perform in operation.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <param name="action">Action.</param>
+ /// <returns>Result.</returns>
+ protected T DoInOp<T>(int type, Func<IBinaryStream, T> action)
+ {
+ return _target.OutStream(type, action);
+ }
+
+ /// <summary>
+ /// Perform simple in operation returning immediate result.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Result.</returns>
+ protected T DoInOp<T>(int type)
+ {
+ return _target.OutStream(type, s => Unmarshal<T>(s));
+ }
+
+ #endregion
+
+ #region OUT-IN operations
+
+ /// <summary>
+ /// Perform out-in operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inAction">In action.</param>
+ /// <returns>Result.</returns>
+ protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, TR> inAction)
+ {
+ return _target.InStreamOutStream(type, stream => WriteToStream(outAction, stream, _marsh), inAction);
+ }
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <typeparam name="TR">The type of the r.</typeparam>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inAction">In action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ return _target.InStreamOutLong(type, stream => WriteToStream(outAction, stream, _marsh),
+ inAction, inErrorAction);
+ }
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ return _target.InStreamOutLong(type, stream => WriteToStream(outAction, stream, _marsh),
+ (stream, res) => res == True, inErrorAction);
+ }
+
+ /// <summary>
+ /// Perform out-in operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inAction">In action.</param>
+ /// <param name="arg">Argument.</param>
+ /// <returns>Result.</returns>
+ protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction,
+ Func<IBinaryStream, IPlatformTargetInternal, TR> inAction, IPlatformTargetInternal arg)
+ {
+ return _target.InObjectStreamOutObjectStream(type, stream => WriteToStream(outAction, stream, _marsh),
+ inAction, arg);
+ }
+
+ /// <summary>
+ /// Perform out-in operation.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <returns>Result.</returns>
+ protected TR DoOutInOp<TR>(int type, Action<BinaryWriter> outAction)
+ {
+ return _target.InStreamOutStream(type, stream => WriteToStream(outAction, stream, _marsh),
+ stream => Unmarshal<TR>(stream));
+ }
+
+ /// <summary>
+ /// Perform simple out-in operation accepting single argument.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="val">Value.</param>
+ /// <returns>Result.</returns>
+ protected TR DoOutInOp<T1, TR>(int type, T1 val)
+ {
+ return _target.InStreamOutStream(type, stream => WriteToStream(val, stream, _marsh),
+ stream => Unmarshal<TR>(stream));
+ }
+
+ /// <summary>
+ /// Perform simple out-in operation accepting two arguments.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="val">Value.</param>
+ /// <returns>Result.</returns>
+ protected long DoOutInOp(int type, long val = 0)
+ {
+ return _target.InLongOutLong(type, val);
+ }
+
+ #endregion
+
+ #region Async operations
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <returns>Task for async operation</returns>
+ protected Task DoOutOpAsync(int type, Action<BinaryWriter> writeAction = null)
+ {
+ return DoOutOpAsync<object>(type, writeAction);
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="T">Type of the result.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Task for async operation</returns>
+ protected Task<T> DoOutOpAsync<T>(int type, Action<BinaryWriter> writeAction = null, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ return GetFuture((futId, futType) => DoOutOp(type, w =>
+ {
+ if (writeAction != null)
+ {
+ writeAction(w);
+ }
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ }), keepBinary, convertFunc).Task;
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="T">Type of the result.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="writeAction">The write action.</param>
+ /// <returns>Future for async operation</returns>
+ protected Future<T> DoOutOpObjectAsync<T>(int type, Action<IBinaryRawWriter> writeAction)
+ {
+ return GetFuture<T>((futId, futType) => DoOutOpObject(type, w =>
+ {
+ writeAction(w);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ }));
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="TR">Type of the result.</typeparam>
+ /// <typeparam name="T1">The type of the first arg.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="val1">First arg.</param>
+ /// <returns>
+ /// Task for async operation
+ /// </returns>
+ protected Task<TR> DoOutOpAsync<T1, TR>(int type, T1 val1)
+ {
+ return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+ {
+ w.WriteObject(val1);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ })).Task;
+ }
+
+ /// <summary>
+ /// Performs async operation.
+ /// </summary>
+ /// <typeparam name="TR">Type of the result.</typeparam>
+ /// <typeparam name="T1">The type of the first arg.</typeparam>
+ /// <typeparam name="T2">The type of the second arg.</typeparam>
+ /// <param name="type">The type code.</param>
+ /// <param name="val1">First arg.</param>
+ /// <param name="val2">Second arg.</param>
+ /// <returns>
+ /// Task for async operation
+ /// </returns>
+ protected Task<TR> DoOutOpAsync<T1, T2, TR>(int type, T1 val1, T2 val2)
+ {
+ return GetFuture<TR>((futId, futType) => DoOutOp(type, w =>
+ {
+ w.WriteObject(val1);
+ w.WriteObject(val2);
+ w.WriteLong(futId);
+ w.WriteInt(futType);
+ })).Task;
+ }
+
+ #endregion
+
+ #region Miscelanneous
+
+ /// <summary>
+ /// Finish marshaling.
+ /// </summary>
+ /// <param name="writer">Writer.</param>
+ internal void FinishMarshal(BinaryWriter writer)
+ {
+ _marsh.FinishMarshal(writer);
+ }
+
+ /// <summary>
+ /// Unmarshal object using the given stream.
+ /// </summary>
+ /// <param name="stream">Stream.</param>
+ /// <returns>Unmarshalled object.</returns>
+ protected virtual T Unmarshal<T>(IBinaryStream stream)
+ {
+ return _marsh.Unmarshal<T>(stream);
+ }
+
+ /// <summary>
+ /// Creates a future and starts listening.
+ /// </summary>
+ /// <typeparam name="T">Future result type</typeparam>
+ /// <param name="listenAction">The listen action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Created future.</returns>
+ private Future<T> GetFuture<T>(Func<long, int, IPlatformTargetInternal> listenAction, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ var futType = FutureType.Object;
+
+ var type = typeof(T);
+
+ if (type.IsPrimitive)
+ IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+ var fut = convertFunc == null && futType != FutureType.Object
+ ? new Future<T>()
+ : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+ var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+ IPlatformTargetInternal futTarget;
+
+ try
+ {
+ futTarget = listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+ throw;
+ }
+
+ fut.SetTarget(new Listenable(futTarget));
+
+ return fut;
+ }
+
+ /// <summary>
+ /// Creates a future and starts listening.
+ /// </summary>
+ /// <typeparam name="T">Future result type</typeparam>
+ /// <param name="listenAction">The listen action.</param>
+ /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param>
+ /// <param name="convertFunc">The function to read future result from stream.</param>
+ /// <returns>Created future.</returns>
+ private Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false,
+ Func<BinaryReader, T> convertFunc = null)
+ {
+ var futType = FutureType.Object;
+
+ var type = typeof(T);
+
+ if (type.IsPrimitive)
+ IgniteFutureTypeMap.TryGetValue(type, out futType);
+
+ var fut = convertFunc == null && futType != FutureType.Object
+ ? new Future<T>()
+ : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc));
+
+ var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut);
+
+ try
+ {
+ listenAction(futHnd, (int)futType);
+ }
+ catch (Exception)
+ {
+ _marsh.Ignite.HandleRegistry.Release(futHnd);
+
+ throw;
+ }
+
+ return fut;
+ }
+
+ /// <summary>
+ /// Writes to stream.
+ /// </summary>
+ private static void WriteToStream(Action<BinaryWriter> action, IBinaryStream stream, Marshaller marsh)
+ {
+ var writer = marsh.StartMarshal(stream);
+
+ action(writer);
+
+ marsh.FinishMarshal(writer);
+ }
+
+ /// <summary>
+ /// Writes to stream.
+ /// </summary>
+ private static void WriteToStream<T>(T obj, IBinaryStream stream, Marshaller marsh)
+ {
+ var writer = marsh.StartMarshal(stream);
+
+ writer.WriteObject(obj);
+
+ marsh.FinishMarshal(writer);
+ }
+
+
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
index 88d2a76..93611f7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs
@@ -24,16 +24,13 @@ namespace Apache.Ignite.Core.Impl.Services
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Common;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Services;
- using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
/// <summary>
/// Services implementation.
/// </summary>
- internal sealed class Services : PlatformTarget, IServices
+ internal sealed class Services : PlatformTargetAdapter, IServices
{
/** */
private const int OpDeploy = 1;
@@ -87,13 +84,12 @@ namespace Apache.Ignite.Core.Impl.Services
/// Initializes a new instance of the <see cref="Services" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="clusterGroup">Cluster group.</param>
/// <param name="keepBinary">Invoker binary flag.</param>
/// <param name="srvKeepBinary">Server binary flag.</param>
- public Services(IUnmanagedTarget target, Marshaller marsh, IClusterGroup clusterGroup,
+ public Services(IPlatformTargetInternal target, IClusterGroup clusterGroup,
bool keepBinary, bool srvKeepBinary)
- : base(target, marsh)
+ : base(target)
{
Debug.Assert(clusterGroup != null);
@@ -108,7 +104,7 @@ namespace Apache.Ignite.Core.Impl.Services
if (_keepBinary)
return this;
- return new Services(Target, Marshaller, _clusterGroup, true, _srvKeepBinary);
+ return new Services(Target, _clusterGroup, true, _srvKeepBinary);
}
/** <inheritDoc /> */
@@ -117,7 +113,7 @@ namespace Apache.Ignite.Core.Impl.Services
if (_srvKeepBinary)
return this;
- return new Services(DoOutOpObject(OpWithServerKeepBinary), Marshaller, _clusterGroup, _keepBinary, true);
+ return new Services(DoOutOpObject(OpWithServerKeepBinary), _clusterGroup, _keepBinary, true);
}
/** <inheritDoc /> */
@@ -372,12 +368,13 @@ namespace Apache.Ignite.Core.Impl.Services
/// <returns>
/// Invocation result.
/// </returns>
- private unsafe object InvokeProxyMethod(IUnmanagedTarget proxy, MethodBase method, object[] args,
+ private object InvokeProxyMethod(IPlatformTargetInternal proxy, MethodBase method, object[] args,
Platform platform)
{
return DoOutInOp(OpInvokeMethod,
writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args, platform),
- (stream, res) => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary), proxy.Target);
+ (stream, res) => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, _keepBinary),
+ proxy);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
index 4ddbc6d..4dd7f9f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
@@ -22,13 +22,12 @@ namespace Apache.Ignite.Core.Impl.Transactions
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
- using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Transactions;
/// <summary>
/// Transactions facade.
/// </summary>
- internal class TransactionsImpl : PlatformTarget, ITransactions
+ internal class TransactionsImpl : PlatformTargetAdapter, ITransactions
{
/** */
private const int OpCacheConfigParameters = 1;
@@ -82,29 +81,19 @@ namespace Apache.Ignite.Core.Impl.Transactions
/// Initializes a new instance of the <see cref="TransactionsImpl" /> class.
/// </summary>
/// <param name="target">Target.</param>
- /// <param name="marsh">Marshaller.</param>
/// <param name="localNodeId">Local node id.</param>
- public TransactionsImpl(IUnmanagedTarget target, Marshaller marsh,
- Guid localNodeId) : base(target, marsh)
+ public TransactionsImpl(IPlatformTargetInternal target, Guid localNodeId) : base(target)
{
_localNodeId = localNodeId;
- TransactionConcurrency concurrency = default(TransactionConcurrency);
- TransactionIsolation isolation = default(TransactionIsolation);
- TimeSpan timeout = default(TimeSpan);
+ var res = target.OutStream(OpCacheConfigParameters, reader => Tuple.Create(
+ (TransactionConcurrency) reader.ReadInt(),
+ (TransactionIsolation) reader.ReadInt(),
+ reader.ReadLongAsTimespan()));
- DoInOp(OpCacheConfigParameters, stream =>
- {
- var reader = marsh.StartUnmarshal(stream).GetRawReader();
-
- concurrency = (TransactionConcurrency) reader.ReadInt();
- isolation = (TransactionIsolation) reader.ReadInt();
- timeout = reader.ReadLongAsTimespan();
- });
-
- _dfltConcurrency = concurrency;
- _dfltIsolation = isolation;
- _dfltTimeout = timeout;
+ _dfltConcurrency = res.Item1;
+ _dfltIsolation = res.Item2;
+ _dfltTimeout = res.Item3;
}
/** <inheritDoc /> */
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index 1720a79..f96157c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -63,6 +63,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetInStreamAsync")]
public static extern void TargetInStreamAsync(void* ctx, void* target, int opType, long memPtr);
+ [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetInStreamOutObjectAsync")]
+ public static extern void* TargetInStreamOutObjectAsync(void* ctx, void* target, int opType, long memPtr);
+
[DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")]
public static extern void* Acquire(void* ctx, void* target);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 2400390..819eda2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -678,7 +678,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
binaryReceiver.Deserialize<StreamReceiverHolder>();
if (receiver != null)
- receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx, cache), stream, keepBinary);
+ {
+ var target = new PlatformJniTarget(new UnmanagedNonReleaseableTarget(_ctx, cache),
+ _ignite.Marshaller);
+ receiver.Receive(_ignite, target, stream, keepBinary);
+ }
return 0;
}
@@ -1171,9 +1175,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
if (affBase != null)
{
- var baseFunc0 = UU.Acquire(_ctx, baseFunc);
+ var baseFunc0 = new PlatformJniTarget(UU.Acquire(_ctx, baseFunc), _ignite.Marshaller);
- affBase.SetBaseFunction(new PlatformAffinityFunction(baseFunc0, _ignite.Marshaller));
+ affBase.SetBaseFunction(new PlatformAffinityFunction(baseFunc0));
}
return _handleRegistry.Allocate(func);
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index a38cf2f..b6e6582 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -175,6 +175,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
JNI.TargetInStreamAsync(target.Context, target.Target, opType, memPtr);
}
+ internal static IUnmanagedTarget TargetInStreamOutObjectAsync(IUnmanagedTarget target, int opType, long memPtr)
+ {
+ void* res = JNI.TargetInStreamOutObjectAsync(target.Context, target.Target, opType, memPtr);
+
+ return target.ChangeTarget(res);
+ }
+
#endregion
#region NATIVE METHODS: MISCELANNEOUS
http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
index 621e604..6f5596a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Interop
{
using System;
+ using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
@@ -99,5 +100,19 @@ namespace Apache.Ignite.Core.Interop
/// <returns>Task.</returns>
Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction,
Func<IBinaryRawReader, T> readAction);
+
+ /// <summary>
+ /// Performs asynchronous operation.
+ /// </summary>
+ /// <typeparam name="T">Result type</typeparam>
+ /// <param name="type">Operation type code.</param>
+ /// <param name="writeAction">Write action (can be null).</param>
+ /// <param name="readAction">Read function (can be null).</param>
+ /// <param name="cancellationToken">The cancellation token.</param>
+ /// <returns>
+ /// Task.
+ /// </returns>
+ Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction,
+ Func<IBinaryRawReader, T> readAction, CancellationToken cancellationToken);
}
}