You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/05/23 13:58:58 UTC
[ignite] branch master updated: IGNITE-16946 .NET: Thin client: Add AtomicLong (#10030)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9fee83a0d16 IGNITE-16946 .NET: Thin client: Add AtomicLong (#10030)
9fee83a0d16 is described below
commit 9fee83a0d1669843dc697e10d1615f1fdc79ac19
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon May 23 16:58:52 2022 +0300
IGNITE-16946 .NET: Thin client: Add AtomicLong (#10030)
* Add `IIgniteClient.GetAtomicLong` and `IAtomicLongClient` APIs.
* Use partition-aware requests.
---
.../internal/client/thin/ClientOperation.java | 6 +-
.../internal/client/thin/TcpIgniteClient.java | 1 -
.../platform/client/ClientMessageParser.java | 7 +
.../datastructures/ClientAtomicLongRequest.java | 6 +-
...AtomicLongValueCompareAndSetAndGetRequest.java} | 14 +-
.../ClientAtomicLongValueCompareAndSetRequest.java | 2 +-
.../platform/PlatformGetInternalCachesTask.java | 87 +++++++++++
.../Client/Cache/PartitionAwarenessTest.cs | 27 ++++
.../Client/DataStructures/AtomicLongClientTests.cs | 174 +++++++++++++++++++++
.../DataStructures/AtomicClientConfiguration.cs | 70 +++++++++
.../Client/DataStructures/IAtomicLongClient.cs | 86 ++++++++++
.../Apache.Ignite.Core/Client/IIgniteClient.cs | 32 ++++
.../platforms/dotnet/Apache.Ignite.Core/IIgnite.cs | 11 +-
.../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 12 +-
.../Impl/Client/DataStructures/AtomicLongClient.cs | 168 ++++++++++++++++++++
.../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 52 ++++++
.../Apache.Ignite.DotNetCore.sln.DotSettings | 1 +
17 files changed, 736 insertions(+), 20 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 6eb2b222ed6..2d4af32a18b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -214,7 +214,10 @@ public enum ClientOperation {
ATOMIC_LONG_VALUE_GET_AND_SET(9005),
/** AtomicLong.compareAndSet. */
- ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006);
+ ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006),
+
+ /** AtomicLong.compareAndSetAndGet. */
+ ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET(9007);
/** Code. */
private final int code;
@@ -391,6 +394,7 @@ public enum ClientOperation {
return ClientOperationType.ATOMIC_LONG_VALUE_GET_AND_SET;
case ATOMIC_LONG_VALUE_COMPARE_AND_SET:
+ case ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
return ClientOperationType.ATOMIC_LONG_VALUE_COMPARE_AND_SET;
default:
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 824242dd47a..3a3a2664fcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -357,7 +357,6 @@ public class TcpIgniteClient implements IgniteClient {
else
w.writeBoolean(false);
}
-
}, null);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 876ec9632ba..9ae7f21e260 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.platform.client.datastructures.Clie
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongExistsRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongRemoveRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueAddAndGetRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetAndGetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetRequest;
@@ -318,6 +319,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** AtomicLong.compareAndSet. */
private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET = 9006;
+ /** AtomicLong.compareAndSetAndGet. */
+ private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET = 9007;
+
/** Marshaller. */
private final GridBinaryMarshaller marsh;
@@ -566,6 +570,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET:
return new ClientAtomicLongValueCompareAndSetRequest(reader);
+
+ case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
+ return new ClientAtomicLongValueCompareAndSetAndGetRequest(reader);
}
return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java
index 706329337b2..5a910330c6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongRequest.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.processors.platform.client.datastructures;
-import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -54,11 +54,11 @@ public class ClientAtomicLongRequest extends ClientRequest {
* @param ctx Context.
* @return Atomic long or null.
*/
- protected IgniteAtomicLong atomicLong(ClientConnectionContext ctx) {
+ protected GridCacheAtomicLongImpl atomicLong(ClientConnectionContext ctx) {
AtomicConfiguration cfg = groupName == null ? null : new AtomicConfiguration().setGroupName(groupName);
try {
- return ctx.kernalContext().dataStructures().atomicLong(name, cfg, 0, false);
+ return (GridCacheAtomicLongImpl)ctx.kernalContext().dataStructures().atomicLong(name, cfg, 0, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e.getMessage(), e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetAndGetRequest.java
similarity index 76%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetAndGetRequest.java
index ee550c6a8bd..ad690adc48e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetAndGetRequest.java
@@ -17,16 +17,16 @@
package org.apache.ignite.internal.processors.platform.client.datastructures;
-import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
/**
- * Atomic long get and set request.
+ * Atomic long compare and set and get request.
*/
-public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongRequest {
+public class ClientAtomicLongValueCompareAndSetAndGetRequest extends ClientAtomicLongRequest {
/** */
private final long expected;
@@ -38,7 +38,7 @@ public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongR
*
* @param reader Reader.
*/
- public ClientAtomicLongValueCompareAndSetRequest(BinaryRawReader reader) {
+ public ClientAtomicLongValueCompareAndSetAndGetRequest(BinaryRawReader reader) {
super(reader);
expected = reader.readLong();
@@ -47,11 +47,11 @@ public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongR
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
- IgniteAtomicLong atomicLong = atomicLong(ctx);
+ GridCacheAtomicLongImpl atomicLong = atomicLong(ctx);
if (atomicLong == null)
return notFoundResponse();
- return new ClientBooleanResponse(requestId(), atomicLong.compareAndSet(expected, val));
+ return new ClientLongResponse(requestId(), atomicLong.compareAndSetAndGet(expected, val));
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java
index ee550c6a8bd..ce8ccee86dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/datastructures/ClientAtomicLongValueCompareAndSetRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.platform.client.ClientConnectionCon
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
/**
- * Atomic long get and set request.
+ * Atomic long compare and set request.
*/
public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongRequest {
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformGetInternalCachesTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformGetInternalCachesTask.java
new file mode 100644
index 00000000000..6cdf5c4fa4d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformGetInternalCachesTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to get internal caches.
+ */
+public class PlatformGetInternalCachesTask extends ComputeTaskAdapter<Object, byte[]> {
+ /** {@inheritDoc} */
+ @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) {
+ return Collections.singletonMap(new InternalCachesJob(), F.first(subgrid));
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] reduce(List<ComputeJobResult> results) {
+ return results.get(0).getData();
+ }
+
+ /**
+ * Job.
+ */
+ @SuppressWarnings("rawtypes")
+ private static class InternalCachesJob extends ComputeJobAdapter {
+ /** */
+ @SuppressWarnings("unused")
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public byte[] execute() {
+ IgniteEx ign = (IgniteEx)ignite;
+
+ BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);
+
+ try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new BinaryHeapOutputStream(512), null, null)) {
+ Collection<IgniteInternalCache<?, ?>> caches = ign.cachesx();
+
+ writer.writeInt(caches.size());
+
+ for (IgniteInternalCache c : caches) {
+ PlatformConfigurationUtils.writeCacheConfiguration(writer, c.configuration());
+ }
+
+ return writer.out().arrayCopy();
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/PartitionAwarenessTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/PartitionAwarenessTest.cs
index d7865b602cf..bf06b57c5f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/PartitionAwarenessTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/PartitionAwarenessTest.cs
@@ -27,6 +27,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Common;
using NUnit.Framework;
@@ -462,6 +463,32 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("Start", RequestNamePrefixStreamer));
}
+ [Test]
+ [TestCase("default-grp-partitioned", null, CacheMode.Partitioned, 0)]
+ [TestCase("default-grp-replicated", null, CacheMode.Replicated, 2)]
+ [TestCase("custom-grp-partitioned", "testAtomicLong", CacheMode.Partitioned, 1)]
+ [TestCase("custom-grp-replicated", "testAtomicLong", CacheMode.Replicated, 0)]
+ public void AtomicLong_RequestIsRoutedToPrimaryNode(
+ string name, string groupName, CacheMode cacheMode, int gridIdx)
+ {
+ var cfg = new AtomicClientConfiguration
+ {
+ GroupName = groupName,
+ CacheMode = cacheMode
+ };
+
+ var atomicLong = Client.GetAtomicLong(name, cfg, 1, true);
+
+ // Warm up.
+ atomicLong.Read();
+ ClearLoggers();
+
+ // Test.
+ atomicLong.Read();
+
+ Assert.AreEqual(gridIdx, GetClientRequestGridIndex("ValueGet", "datastructures.ClientAtomicLong"));
+ }
+
protected override IgniteClientConfiguration GetClientConfiguration()
{
var cfg = base.GetClientConfiguration();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/DataStructures/AtomicLongClientTests.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/DataStructures/AtomicLongClientTests.cs
new file mode 100644
index 00000000000..5f1fcc7bd1e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/DataStructures/AtomicLongClientTests.cs
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.DataStructures
+{
+ using System;
+ using System.Linq;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.DataStructures;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for <see cref="IAtomicLongClient"/>.
+ /// </summary>
+ public class AtomicLongClientTests : ClientTestBase
+ {
+ [Test]
+ public void TestCreateSetsInitialValue()
+ {
+ var atomicLongClient = Client.GetAtomicLong(TestUtils.TestName, 42, true);
+ var atomicLongServer = GetIgnite().GetAtomicLong(atomicLongClient.Name, 1, false);
+
+ Assert.AreEqual(42, atomicLongClient.Read());
+ Assert.AreEqual(42, atomicLongServer.Read());
+ }
+
+ [Test]
+ public void TestCreateIgnoresInitialValueWhenAlreadyExists()
+ {
+ var atomicLong = Client.GetAtomicLong(TestUtils.TestName, 42, true);
+ var atomicLong2 = Client.GetAtomicLong(TestUtils.TestName, 43, false);
+
+ Assert.AreEqual(42, atomicLong.Read());
+ Assert.AreEqual(42, atomicLong2.Read());
+ }
+
+ [Test]
+ public void TestOperationsThrowExceptionWhenAtomicLongDoesNotExist()
+ {
+ var name = TestUtils.TestName;
+ var atomicLong = Client.GetAtomicLong(name, 42, true);
+ atomicLong.Close();
+
+ Action<Action> assertDoesNotExistError = act =>
+ {
+ var ex = Assert.Throws<IgniteClientException>(() => act());
+
+ StringAssert.Contains($"AtomicLong with name '{name}' does not exist.", ex.Message);
+ };
+
+ Assert.IsTrue(atomicLong.IsClosed());
+
+ assertDoesNotExistError(() => atomicLong.Read());
+ assertDoesNotExistError(() => atomicLong.Add(1));
+ assertDoesNotExistError(() => atomicLong.Increment());
+ assertDoesNotExistError(() => atomicLong.Decrement());
+ assertDoesNotExistError(() => atomicLong.Exchange(22));
+ assertDoesNotExistError(() => atomicLong.CompareExchange(22, 33));
+ }
+
+ [Test]
+ public void TestIsClosed()
+ {
+ var atomicLong = Client.GetAtomicLong(TestUtils.TestName, 0, false);
+ Assert.IsNull(atomicLong);
+
+ atomicLong = Client.GetAtomicLong(TestUtils.TestName, 1, true);
+ Assert.IsFalse(atomicLong.IsClosed());
+ Assert.AreEqual(1, atomicLong.Read());
+
+ atomicLong.Close();
+ Assert.IsTrue(atomicLong.IsClosed());
+ }
+
+ [Test]
+ public void TestIncrementDecrementAdd()
+ {
+ var atomicLong = Client.GetAtomicLong(TestUtils.TestName, 1, true);
+
+ Assert.AreEqual(2, atomicLong.Increment());
+ Assert.AreEqual(2, atomicLong.Read());
+
+ Assert.AreEqual(1, atomicLong.Decrement());
+ Assert.AreEqual(1, atomicLong.Read());
+
+ Assert.AreEqual(101, atomicLong.Add(100));
+ Assert.AreEqual(101, atomicLong.Read());
+ }
+
+ [Test]
+ public void TestExchange()
+ {
+ var atomicLong = Client.GetAtomicLong(TestUtils.TestName, 1, true);
+
+ Assert.AreEqual(1, atomicLong.Exchange(100));
+ Assert.AreEqual(100, atomicLong.Read());
+ }
+
+ [Test]
+ public void TestCompareExchange()
+ {
+ var atomicLong = Client.GetAtomicLong(TestUtils.TestName, 1, true);
+
+ Assert.AreEqual(1, atomicLong.CompareExchange(3, 2));
+ Assert.AreEqual(1, atomicLong.Read());
+
+ Assert.AreEqual(1, atomicLong.CompareExchange(4, 1));
+ Assert.AreEqual(4, atomicLong.Read());
+ }
+
+ [Test]
+ public void TestCustomConfigurationPropagatesToServer()
+ {
+ var cfg1 = new AtomicClientConfiguration
+ {
+ AtomicSequenceReserveSize = 32,
+ Backups = 2,
+ CacheMode = CacheMode.Partitioned,
+ GroupName = "atomics-partitioned"
+ };
+
+ var cfg2 = new AtomicClientConfiguration
+ {
+ AtomicSequenceReserveSize = 33,
+ Backups = 3,
+ CacheMode = CacheMode.Replicated,
+ GroupName = "atomics-replicated"
+ };
+
+ var name = TestUtils.TestName;
+
+ var atomicLong1 = Client.GetAtomicLong(name, cfg1, 1, true);
+ var atomicLong2 = Client.GetAtomicLong(name, cfg2, 2, true);
+ var atomicLong3 = Client.GetAtomicLong(name, 3, true);
+
+ var cacheConfigBytes = GetIgnite().GetCompute().ExecuteJavaTask<byte[]>(
+ "org.apache.ignite.platform.PlatformGetInternalCachesTask", null);
+
+ Assert.IsNotNull(cacheConfigBytes);
+
+ var stream = new BinaryHeapStream(cacheConfigBytes);
+ var reader = new BinaryReader(BinaryUtils.Marshaller, stream, BinaryMode.Deserialize, null);
+
+ var caches = Enumerable.Range(0, reader.ReadInt())
+ .Select(_ => new CacheConfiguration(reader))
+ .ToDictionary(c => c.Name);
+
+ Assert.AreEqual(2, caches["ignite-sys-atomic-cache@atomics-partitioned"].Backups);
+ Assert.AreEqual(int.MaxValue, caches["ignite-sys-atomic-cache@atomics-replicated"].Backups);
+ Assert.AreEqual(1, caches["ignite-sys-atomic-cache@default-ds-group"].Backups);
+
+ Assert.AreEqual(1, atomicLong1.Read());
+ Assert.AreEqual(2, atomicLong2.Read());
+ Assert.AreEqual(3, atomicLong3.Read());
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/AtomicClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/AtomicClientConfiguration.cs
new file mode 100644
index 00000000000..e6b7bc57d24
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/AtomicClientConfiguration.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.DataStructures
+{
+ using System.ComponentModel;
+ using Apache.Ignite.Core.Cache.Configuration;
+
+ /// <summary>
+ /// Configuration for atomic data structures. See <see cref="IAtomicLongClient"/>.
+ /// </summary>
+ public class AtomicClientConfiguration
+ {
+ /// <summary>
+ /// Default value for <see cref="CacheMode"/>.
+ /// </summary>
+ public const CacheMode DefaultCacheMode = CacheMode.Partitioned;
+
+ /// <summary>
+ /// Default value for <see cref="AtomicSequenceReserveSize"/>.
+ /// </summary>
+ public const int DefaultAtomicSequenceReserveSize = 1000;
+
+ /// <summary>
+ /// Default value for <see cref="Backups"/>.
+ /// </summary>
+ public const int DefaultBackups = 1;
+
+ /// <summary>
+ /// Gets or sets the number of backup nodes for the underlying cache.
+ /// </summary>
+ [DefaultValue(DefaultBackups)]
+ public int Backups { get; set; } = DefaultBackups;
+
+ /// <summary>
+ /// Gets or sets the cache mode for the underlying cache.
+ /// </summary>
+ [DefaultValue(DefaultCacheMode)]
+ public CacheMode CacheMode { get; set; } = DefaultCacheMode;
+
+ /// <summary>
+ /// Gets or sets the default number of sequence values reserved for atomic sequence instances. After
+ /// a certain number has been reserved, consequent increments of the sequence will happen locally,
+ /// without communication with other nodes, until the next reservation has to be made.
+ /// <para />
+ /// Default is <see cref="DefaultAtomicSequenceReserveSize"/>
+ /// </summary>
+ [DefaultValue(DefaultAtomicSequenceReserveSize)]
+ public int AtomicSequenceReserveSize { get; set; } = DefaultAtomicSequenceReserveSize;
+
+ /// <summary>
+ /// Gets or sets the group name for the underlying cache.
+ /// </summary>
+ public string GroupName { get; set; }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/IAtomicLongClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/IAtomicLongClient.cs
new file mode 100644
index 00000000000..b97a24e2516
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/DataStructures/IAtomicLongClient.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.DataStructures
+{
+ /// <summary>
+ /// Represents a distributed atomic long value.
+ /// <para />
+ /// Use <see cref="IIgniteClient.GetAtomicLong(string,long,bool)"/> to get or create an instance.
+ /// </summary>
+ public interface IAtomicLongClient
+ {
+ /// <summary>
+ /// Gets the name of this atomic long.
+ /// </summary>
+ /// <value>
+ /// Name of this atomic long.
+ /// </value>
+ string Name { get; }
+
+ /// <summary>
+ /// Returns current value.
+ /// </summary>
+ /// <returns>Current value of the atomic long.</returns>
+ long Read();
+
+ /// <summary>
+ /// Increments current value and returns result.
+ /// </summary>
+ /// <returns>Current value of the atomic long.</returns>
+ long Increment();
+
+ /// <summary>
+ /// Adds specified value to the current value and returns result.
+ /// </summary>
+ /// <param name="value">The value to add.</param>
+ /// <returns>Current value of the atomic long.</returns>
+ long Add(long value);
+
+ /// <summary>
+ /// Decrements current value and returns result.
+ /// </summary>
+ /// <returns>Current value of the atomic long.</returns>
+ long Decrement();
+
+ /// <summary>
+ /// Sets current value to a specified value and returns the original value.
+ /// </summary>
+ /// <param name="value">The value to set.</param>
+ /// <returns>Original value of the atomic long.</returns>
+ long Exchange(long value);
+
+ /// <summary>
+ /// Compares current value with specified value for equality and, if they are equal, replaces current value.
+ /// </summary>
+ /// <param name="value">The value to set.</param>
+ /// <param name="comparand">The value that is compared to the current value.</param>
+ /// <returns>Original value of the atomic long.</returns>
+ long CompareExchange(long value, long comparand);
+
+ /// <summary>
+ /// Determines whether this instance was removed from cache.
+ /// </summary>
+ /// <returns>True if this atomic was removed from cache; otherwise, false.</returns>
+ bool IsClosed();
+
+ /// <summary>
+ /// Closes this instance.
+ /// </summary>
+ void Close();
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index f61ef57e646..cdb18dbbf31 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Client
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Client.Datastream;
+ using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Client.Services;
using Apache.Ignite.Core.Client.Transactions;
@@ -189,5 +190,36 @@ namespace Apache.Ignite.Core.Client
/// <param name="options">Data streamer options.</param>
/// <returns>Data streamer.</returns>
IDataStreamerClient<TK, TV> GetDataStreamer<TK, TV>(string cacheName, DataStreamerClientOptions<TK, TV> options);
+
+ /// <summary>
+ /// Gets an atomic long with the specified name.
+ /// Creates a new atomic long if it does not exist and <paramref name="create"/> is true.
+ /// </summary>
+ /// <param name="name">Name of the atomic long.</param>
+ /// <param name="initialValue">
+ /// Initial value for the atomic long. Ignored if <paramref name="create"/> is false.
+ /// </param>
+ /// <param name="create">Flag indicating whether atomic long should be created if it does not exist.</param>
+ /// <returns>Atomic long instance with the specified name,
+ /// or null if it does not exist and <paramref name="create"/> is <c>false</c>.</returns>
+ IAtomicLongClient GetAtomicLong(string name, long initialValue, bool create);
+
+ /// <summary>
+ /// Gets an atomic long with the specified name.
+ /// Creates a new atomic long if it does not exist and <paramref name="create"/> is true.
+ /// </summary>
+ /// <param name="name">Name of the atomic long.</param>
+ /// <param name="configuration">Configuration.</param>
+ /// <param name="initialValue">
+ /// Initial value for the atomic long. Ignored if <paramref name="create"/> is false.
+ /// </param>
+ /// <param name="create">Flag indicating whether atomic long should be created if it does not exist.</param>
+ /// <returns>Atomic long instance with the specified name,
+ /// or null if it does not exist and <paramref name="create"/> is <c>false</c>.</returns>
+ IAtomicLongClient GetAtomicLong(
+ string name,
+ AtomicClientConfiguration configuration,
+ long initialValue,
+ bool create);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs
index a220160b928..478bc7bcac5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs
@@ -240,17 +240,16 @@ namespace Apache.Ignite.Core
IServices GetServices();
/// <summary>
- /// Gets an atomic long with specified name from cache.
- /// Creates new atomic long in cache if it does not exist and <c>create</c> is true.
+ /// Gets an atomic long with the specified name.
+ /// Creates a new atomic long if it does not exist and <paramref name="create"/> is true.
/// </summary>
/// <param name="name">Name of the atomic long.</param>
/// <param name="initialValue">
- /// Initial value for the atomic long. Ignored if <c>create</c> is false.
+ /// Initial value for the atomic long. Ignored if <paramref name="create"/> is false.
/// </param>
/// <param name="create">Flag indicating whether atomic long should be created if it does not exist.</param>
- /// <returns>Atomic long instance with specified name,
- /// or null if it does not exist and <c>create</c> flag is not set.</returns>
- /// <exception cref="IgniteException">If atomic long could not be fetched or created.</exception>
+ /// <returns>Atomic long instance with the specified name,
+ /// or null if it does not exist and <paramref name="create"/> is <c>false</c>.</returns>
IAtomicLong GetAtomicLong(string name, long initialValue, bool create);
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
index 2fdbc3d7fe9..f19237c4638 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
@@ -99,6 +99,16 @@ namespace Apache.Ignite.Core.Impl.Client
// Data Streamer.
DataStreamerStart = 8000,
- DataStreamerAddData = 8001
+ DataStreamerAddData = 8001,
+
+ // Data Structures.
+ AtomicLongCreate = 9000,
+ AtomicLongRemove = 9001,
+ AtomicLongExists = 9002,
+ AtomicLongValueGet = 9003,
+ AtomicLongValueAddAndGet = 9004,
+ AtomicLongValueGetAndSet = 9005,
+ AtomicLongValueCompareAndSet = 9006,
+ AtomicLongValueCompareAndSetAndGet = 9007
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/DataStructures/AtomicLongClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/DataStructures/AtomicLongClient.cs
new file mode 100644
index 00000000000..4a49022e435
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/DataStructures/AtomicLongClient.cs
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.DataStructures
+{
+ using Apache.Ignite.Core.Client.DataStructures;
+ using Apache.Ignite.Core.Impl.Binary;
+
+ /// <summary>
+ /// Thin client atomic long.
+ /// </summary>
+ internal sealed class AtomicLongClient : IAtomicLongClient
+ {
+ /** */
+ private const string DefaultDataStructuresCacheGroupName = "default-ds-group";
+
+ /** */
+ private const string AtomicsCacheName = "ignite-sys-atomic-cache";
+
+ /** */
+ private readonly ClientFailoverSocket _socket;
+
+ /** */
+ private readonly int _cacheId;
+
+ /** */
+ private readonly string _groupName;
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="AtomicLongClient"/> class.
+ /// </summary>
+ /// <param name="socket">Socket.</param>
+ /// <param name="name">Name.</param>
+ /// <param name="groupName">Group name.</param>
+ public AtomicLongClient(ClientFailoverSocket socket, string name, string groupName)
+ {
+ _socket = socket;
+ Name = name;
+ _groupName = groupName;
+
+ var cacheName = AtomicsCacheName + "@" + (groupName ?? DefaultDataStructuresCacheGroupName);
+ _cacheId = BinaryUtils.GetCacheId(cacheName);
+ }
+
+ /** <inheritDoc /> */
+ public string Name { get; }
+
+ /** <inheritDoc /> */
+ public long Read()
+ {
+ return _socket.DoOutInOpAffinity(
+ ClientOp.AtomicLongValueGet,
+ ctx => WriteName(ctx),
+ r => r.Reader.ReadLong(),
+ _cacheId,
+ AffinityKey);
+ }
+
+ /** <inheritDoc /> */
+ public long Increment()
+ {
+ return Add(1);
+ }
+
+ /** <inheritDoc /> */
+ public long Add(long value)
+ {
+ return _socket.DoOutInOpAffinity(
+ ClientOp.AtomicLongValueAddAndGet,
+ ctx =>
+ {
+ WriteName(ctx);
+ ctx.Writer.WriteLong(value);
+ },
+ r => r.Reader.ReadLong(),
+ _cacheId,
+ AffinityKey);
+ }
+
+ /** <inheritDoc /> */
+ public long Decrement()
+ {
+ return Add(-1);
+ }
+
+ /** <inheritDoc /> */
+ public long Exchange(long value)
+ {
+ return _socket.DoOutInOpAffinity(
+ ClientOp.AtomicLongValueGetAndSet,
+ ctx =>
+ {
+ WriteName(ctx);
+ ctx.Writer.WriteLong(value);
+ },
+ r => r.Reader.ReadLong(),
+ _cacheId,
+ AffinityKey);
+ }
+
+ /** <inheritDoc /> */
+ public long CompareExchange(long value, long comparand)
+ {
+ return _socket.DoOutInOpAffinity(
+ ClientOp.AtomicLongValueCompareAndSetAndGet,
+ ctx =>
+ {
+ WriteName(ctx);
+ ctx.Writer.WriteLong(comparand);
+ ctx.Writer.WriteLong(value);
+ },
+ r => r.Reader.ReadLong(),
+ _cacheId,
+ AffinityKey);
+ }
+
+ /** <inheritDoc /> */
+ public bool IsClosed()
+ {
+ return _socket.DoOutInOpAffinity(
+ ClientOp.AtomicLongExists,
+ ctx => WriteName(ctx),
+ r => !r.Reader.ReadBoolean(),
+ _cacheId,
+ AffinityKey);
+ }
+
+ /** <inheritDoc /> */
+ public void Close()
+ {
+ _socket.DoOutInOpAffinity<object, string>(
+ ClientOp.AtomicLongRemove,
+ ctx => WriteName(ctx),
+ null,
+ _cacheId,
+ AffinityKey);
+ }
+
+ /// <summary>
+ /// Gets the affinity key.
+ /// GridCacheInternalKeyImpl uses name as AffinityKeyMapped.
+ /// </summary>
+ private string AffinityKey => Name;
+
+ /// <summary>
+ /// Writes the name of this data structure.
+ /// </summary>
+ private void WriteName(ClientRequestContext ctx)
+ {
+ ctx.Writer.WriteString(Name);
+ ctx.Writer.WriteString(_groupName);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index 47d5b1c172d..728458c7d40 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Client.Datastream;
+ using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Client.Services;
using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Datastream;
@@ -40,6 +41,7 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Impl.Client.Cluster;
using Apache.Ignite.Core.Impl.Client.Compute;
using Apache.Ignite.Core.Impl.Client.Datastream;
+ using Apache.Ignite.Core.Impl.Client.DataStructures;
using Apache.Ignite.Core.Impl.Client.Services;
using Apache.Ignite.Core.Impl.Client.Transactions;
using Apache.Ignite.Core.Impl.Cluster;
@@ -307,6 +309,56 @@ namespace Apache.Ignite.Core.Impl.Client
return new DataStreamerClient<TK, TV>(_socket, cacheName, options);
}
+ /** <inheritDoc /> */
+ public IAtomicLongClient GetAtomicLong(string name, long initialValue, bool create)
+ {
+ return GetAtomicLong(name, null, initialValue, create);
+ }
+
+ /** <inheritDoc /> */
+ public IAtomicLongClient GetAtomicLong(
+ string name,
+ AtomicClientConfiguration configuration,
+ long initialValue,
+ bool create)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(name, "name");
+
+ if (create)
+ {
+ _socket.DoOutInOp<object>(ClientOp.AtomicLongCreate, ctx =>
+ {
+ var w = ctx.Writer;
+
+ w.WriteString(name);
+ w.WriteLong(initialValue);
+
+ if (configuration != null)
+ {
+ w.WriteBoolean(true);
+ w.WriteInt(configuration.AtomicSequenceReserveSize);
+ w.WriteByte((byte)configuration.CacheMode);
+ w.WriteInt(configuration.Backups);
+ w.WriteString(configuration.GroupName);
+ }
+ else
+ {
+ w.WriteBoolean(false);
+ }
+ }, null);
+ }
+
+ var res = new AtomicLongClient(_socket, name, configuration?.GroupName);
+
+ if (!create && res.IsClosed())
+ {
+ // Return null when specified atomic long does not exist to match thick API behavior.
+ return null;
+ }
+
+ return res;
+ }
+
/** <inheritDoc /> */
public IBinaryProcessor BinaryProcessor
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.DotNetCore.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.DotNetCore.sln.DotSettings
index 0f851bdad9c..322d03bca9f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.DotNetCore.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.DotNetCore.sln.DotSettings
@@ -10,6 +10,7 @@
<s:Boolean x:Key="/Default/Environment/UnitTesting/ShadowCopy/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=binarizable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=cgroup/@EntryIndexedValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/UserDictionary/Words/=comparand/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Datastream/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=failover/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Multithreaded/@EntryIndexedValue">True</s:Boolean>