You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/28 13:07:17 UTC
[30/50] [abbrv] ignite git commit: IGNITE-3328 .NET: Support
user-defined AffinityFunction
IGNITE-3328 .NET: Support user-defined AffinityFunction
This closes #826
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1c755c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1c755c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1c755c7
Branch: refs/heads/ignite-1232
Commit: e1c755c7ec1e8e7d6fc60b722c32d25bf188cd6b
Parents: 6cfd991
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Fri Jun 24 18:57:07 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Fri Jun 24 18:57:07 2016 +0300
----------------------------------------------------------------------
.../GridAffinityFunctionContextImpl.java | 9 +
.../affinity/PlatformAffinityFunction.java | 242 ++++++++++++++++
.../callback/PlatformCallbackGateway.java | 87 ++++++
.../callback/PlatformCallbackUtils.java | 46 +++
.../utils/PlatformConfigurationUtils.java | 13 +-
.../platforms/cpp/jni/include/ignite/jni/java.h | 19 +-
modules/platforms/cpp/jni/src/java.cpp | 35 ++-
.../Apache.Ignite.Core.Tests.csproj | 5 +-
.../Cache/Affinity/AffinityFieldTest.cs | 199 +++++++++++++
.../Cache/Affinity/AffinityFunctionTest.cs | 282 +++++++++++++++++++
.../Cache/Affinity/AffinityTest.cs | 138 +++++++++
.../Cache/CacheAffinityFieldTest.cs | 199 -------------
.../Cache/CacheAffinityTest.cs | 139 ---------
.../Cache/CacheConfigurationTest.cs | 6 +-
.../native-client-test-cache-affinity.xml | 2 +-
.../IgniteConfigurationSerializerTest.cs | 14 +-
.../Apache.Ignite.Core.Tests/TestRunner.cs | 6 +-
.../Apache.Ignite.Core.csproj | 2 +
.../Cache/Affinity/AffinityFunctionBase.cs | 102 ++++++-
.../Cache/Affinity/AffinityFunctionContext.cs | 116 ++++++++
.../Cache/Affinity/AffinityTopologyVersion.cs | 138 +++++++++
.../Cache/Affinity/IAffinityFunction.cs | 55 +++-
.../Cache/Configuration/CacheConfiguration.cs | 9 +-
.../Apache.Ignite.Core/Events/EventBase.cs | 2 +-
.../Apache.Ignite.Core/Events/EventReader.cs | 8 +-
.../Apache.Ignite.Core/IgniteConfiguration.cs | 1 +
.../IgniteConfigurationSection.xsd | 4 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 4 +-
.../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 6 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 120 +++++++-
.../dotnet/Apache.Ignite.sln.DotSettings | 7 +-
31 files changed, 1629 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
index 6c97efd..e2bb99d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
@@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext
@Override public int backups() {
return backups;
}
+
+ /**
+ * Gets the previous assignment.
+ *
+ * @return Previous assignment
+ */
+ public List<List<ClusterNode>> prevAssignment() {
+ return prevAssignment;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
new file mode 100644
index 0000000..4da5e24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Platform AffinityFunction.
+ */
+public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private Object userFunc;
+
+ /** */
+ private int partitions;
+
+ /** */
+ private transient Ignite ignite;
+
+ /** */
+ private transient PlatformContext ctx;
+
+ /** */
+ private transient long ptr;
+
+ /**
+ * Ctor for serialization.
+ *
+ */
+ public PlatformAffinityFunction() {
+ partitions = -1;
+ }
+
+ /**
+ * Ctor.
+ *
+ * @param func User fun object.
+ * @param partitions Initial number of partitions.
+ */
+ public PlatformAffinityFunction(Object func, int partitions) {
+ userFunc = func;
+ this.partitions = partitions;
+ }
+
+ /** {@inheritDoc} */
+ public Object getUserFunc() {
+ return userFunc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op: userFunc is always in initial state (it is serialized only once on start).
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ // Affinity function can not return different number of partitions,
+ // so we pass this value once from the platform.
+ assert partitions > 0;
+
+ return partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ assert ctx != null;
+ assert ptr != 0;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(key);
+
+ out.synchronize();
+
+ return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ assert ctx != null;
+ assert ptr != 0;
+ assert affCtx != null;
+
+ try (PlatformMemory outMem = ctx.memory().allocate()) {
+ try (PlatformMemory inMem = ctx.memory().allocate()) {
+ PlatformOutputStream out = outMem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ // Write previous assignment
+ List<List<ClusterNode>> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment();
+
+ if (prevAssignment == null)
+ writer.writeInt(-1);
+ else {
+ writer.writeInt(prevAssignment.size());
+
+ for (List<ClusterNode> part : prevAssignment)
+ ctx.writeNodes(writer, part);
+ }
+
+ // Write other props
+ writer.writeInt(affCtx.backups());
+ ctx.writeNodes(writer, affCtx.currentTopologySnapshot());
+ writer.writeLong(affCtx.currentTopologyVersion().topologyVersion());
+ writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion());
+ ctx.writeEvent(writer, affCtx.discoveryEvent());
+
+ // Call platform
+ out.synchronize();
+ ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer());
+
+ PlatformInputStream in = inMem.input();
+ BinaryRawReaderEx reader = ctx.reader(in);
+
+ // Read result
+ int partCnt = in.readInt();
+ List<List<ClusterNode>> res = new ArrayList<>(partCnt);
+ IgniteClusterEx cluster = ctx.kernalContext().grid().cluster();
+
+ for (int i = 0; i < partCnt; i++) {
+ int partSize = in.readInt();
+ List<ClusterNode> part = new ArrayList<>(partSize);
+
+ for (int j = 0; j < partSize; j++)
+ part.add(cluster.node(reader.readUuid()));
+
+ res.add(part);
+ }
+
+ return res;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ assert ctx != null;
+ assert ptr != 0;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeUuid(nodeId);
+
+ out.synchronize();
+
+ ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(userFunc);
+ out.writeInt(partitions);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ userFunc = in.readObject();
+ partitions = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ assert ignite != null;
+ ctx = PlatformUtils.platformContext(ignite);
+ assert ctx != null;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(userFunc);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().affinityFunctionInit(mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ assert ctx != null;
+
+ ctx.gateway().affinityFunctionDestroy(ptr);
+ }
+
+ /**
+ * Injects the Ignite.
+ *
+ * @param ignite Ignite.
+ */
+ @IgniteInstanceResource
+ private void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index 3439f38..3708e8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -951,6 +951,93 @@ public class PlatformCallbackGateway {
}
/**
+ * Initializes affinity function.
+ *
+ * @param memPtr Pointer to a stream with serialized affinity function.
+ * @return Affinity function pointer.
+ */
+ public long affinityFunctionInit(long memPtr) {
+ enter();
+
+ try {
+ return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Gets the partition from affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with key object.
+ * @return Partition number for a given key.
+ */
+ public int affinityFunctionPartition(long ptr, long memPtr) {
+ enter();
+
+ try {
+ return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Assigns the affinity partitions.
+ *
+ * @param ptr Affinity function pointer.
+ * @param outMemPtr Pointer to a stream with affinity context.
+ * @param inMemPtr Pointer to a stream with result.
+ */
+ public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){
+ enter();
+
+ try {
+ PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Removes the node from affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with node id.
+ */
+ public void affinityFunctionRemoveNode(long ptr, long memPtr) {
+ enter();
+
+ try {
+ PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Destroys the affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ */
+ public void affinityFunctionDestroy(long ptr) {
+ if (!lock.enterBusy())
+ return; // skip: destroy is not necessary during shutdown.
+
+ try {
+ PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
* Enter gateway.
*/
protected void enter() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index f7d6586..d19782d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -496,6 +496,52 @@ public class PlatformCallbackUtils {
static native void onClientReconnected(long envPtr, boolean clusterRestarted);
/**
+ * Initializes affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Pointer to a stream with serialized affinity function.
+ * @return Affinity function pointer.
+ */
+ static native long affinityFunctionInit(long envPtr, long memPtr);
+
+ /**
+ * Gets the partition from affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with key object.
+ * @return Partition number for a given key.
+ */
+ static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr);
+
+ /**
+ * Assigns the affinity partitions.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param outMemPtr Pointer to a stream with affinity context.
+ * @param inMemPtr Pointer to a stream with result.
+ */
+ static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr);
+
+ /**
+ * Removes the node from affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with node id.
+ */
+ static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr);
+
+ /**
+ * Destroys the affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ */
+ static native void affinityFunctionDestroy(long envPtr, long ptr);
+
+ /**
* Private constructor.
*/
private PlatformCallbackUtils() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 29b6a70..7353f08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.binary.*;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration;
import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
@@ -237,7 +238,7 @@ public class PlatformConfigurationUtils {
* @param in Stream.
* @return Affinity function.
*/
- private static AffinityFunction readAffinityFunction(BinaryRawReader in) {
+ private static AffinityFunction readAffinityFunction(BinaryRawReaderEx in) {
byte plcTyp = in.readByte();
switch (plcTyp) {
@@ -255,6 +256,9 @@ public class PlatformConfigurationUtils {
f.setExcludeNeighbors(in.readBoolean());
return f;
}
+ case 3: {
+ return new PlatformAffinityFunction(in.readObjectDetached(), in.readInt());
+ }
default:
assert false;
}
@@ -296,6 +300,13 @@ public class PlatformConfigurationUtils {
out.writeInt(f0.getPartitions());
out.writeBoolean(f0.isExcludeNeighbors());
}
+ else if (f instanceof PlatformAffinityFunction) {
+ out.writeByte((byte)3);
+
+ PlatformAffinityFunction f0 = (PlatformAffinityFunction)f;
+ out.writeObject(f0.getUserFunc());
+ out.writeInt(f.partitions());
+ }
else {
out.writeByte((byte)0);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 3d45ec0..98779c4 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -21,7 +21,6 @@
#include <jni.h>
#include "ignite/common/common.h"
-#include "ignite/ignite_error.h"
namespace ignite
{
@@ -101,6 +100,12 @@ namespace ignite
typedef void(JNICALL *OnClientDisconnectedHandler)(void* target);
typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted);
+ typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr);
+ typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr);
+ typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr);
+ typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr);
+ typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr);
+
/**
* JNI handlers holder.
*/
@@ -178,6 +183,12 @@ namespace ignite
OnClientDisconnectedHandler onClientDisconnected;
OnClientReconnectedHandler onClientReconnected;
+
+ AffinityFunctionInitHandler affinityFunctionInit;
+ AffinityFunctionPartitionHandler affinityFunctionPartition;
+ AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions;
+ AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode;
+ AffinityFunctionDestroyHandler affinityFunctionDestroy;
};
/**
@@ -740,6 +751,12 @@ namespace ignite
JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr);
JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted);
+
+ JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
+ JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 66be0ca..577ee26 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -22,6 +22,7 @@
#include "ignite/jni/utils.h"
#include "ignite/common/concurrent.h"
#include "ignite/jni/java.h"
+#include <ignite/ignite_error.h>
#define IGNITE_SAFE_PROC_NO_ARG(jniEnv, envPtr, type, field) { \
JniHandlers* hnds = reinterpret_cast<JniHandlers*>(envPtr); \
@@ -362,6 +363,12 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJ)J", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true);
+
const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils";
JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true);
JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true);
@@ -821,7 +828,7 @@ namespace ignite
void RegisterNatives(JNIEnv* env) {
{
- JNINativeMethod methods[54];
+ JNINativeMethod methods[59];
int idx = 0;
@@ -898,6 +905,12 @@ namespace ignite
AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED, reinterpret_cast<void*>(JniOnClientDisconnected));
AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED, reinterpret_cast<void*>(JniOnClientReconnected));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast<void*>(JniAffinityFunctionInit));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast<void*>(JniAffinityFunctionPartition));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast<void*>(JniAffinityFunctionAssignPartitions));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast<void*>(JniAffinityFunctionRemoveNode));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast<void*>(JniAffinityFunctionDestroy));
+
jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx);
if (res != JNI_OK)
@@ -2833,6 +2846,26 @@ namespace ignite
JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted) {
IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted);
}
+
+ JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) {
+ IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr);
+ }
+
+ JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+ IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 1a367d4..15e46ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -59,13 +59,14 @@
<Compile Include="Binary\BinaryCompactFooterInteropTest.cs" />
<Compile Include="Binary\BinarySelfTestFullFooter.cs" />
<Compile Include="Binary\BinaryStringTest.cs" />
- <Compile Include="Cache\CacheAffinityFieldTest.cs" />
+ <Compile Include="Cache\Affinity\AffinityFieldTest.cs" />
+ <Compile Include="Cache\Affinity\AffinityFunctionTest.cs" />
<Compile Include="Cache\CacheConfigurationTest.cs" />
<Compile Include="Cache\CacheDynamicStartTest.cs" />
<Compile Include="Cache\CacheNearTest.cs" />
<Compile Include="Cache\CacheTestAsyncWrapper.cs" />
<Compile Include="Cache\CacheAbstractTest.cs" />
- <Compile Include="Cache\CacheAffinityTest.cs" />
+ <Compile Include="Cache\Affinity\AffinityTest.cs" />
<Compile Include="Cache\CacheEntryTest.cs" />
<Compile Include="Cache\CacheForkedTest.cs" />
<Compile Include="Cache\CacheLocalAtomicTest.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
new file mode 100644
index 0000000..ceb04cd
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
@@ -0,0 +1,199 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable UnusedAutoPropertyAccessor.Local
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Affinity;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Tests.Compute;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests custom affinity mapping.
+ /// </summary>
+ public class AffinityFieldTest
+ {
+ /** */
+ private ICache<object, string> _cache1;
+
+ /** */
+ private ICache<object, string> _cache2;
+
+ /// <summary>
+ /// Fixture set up.
+ /// </summary>
+ [TestFixtureSetUp]
+ public void FixtureSetUp()
+ {
+ var grid1 = Ignition.Start(GetConfig());
+ var grid2 = Ignition.Start(GetConfig("grid2"));
+
+ _cache1 = grid1.CreateCache<object, string>(new CacheConfiguration
+ {
+ CacheMode = CacheMode.Partitioned
+ });
+ _cache2 = grid2.GetCache<object, string>(null);
+ }
+
+ /// <summary>
+ /// Fixture tear down.
+ /// </summary>
+ [TestFixtureTearDown]
+ public void FixtureTearDown()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Tests the metadata.
+ /// </summary>
+ [Test]
+ public void TestMetadata()
+ {
+ // Put keys to update meta
+ _cache1.Put(new CacheKey(), string.Empty);
+ _cache1.Put(new CacheKeyAttr(), string.Empty);
+ _cache1.Put(new CacheKeyAttrOverride(), string.Empty);
+
+ // Verify
+ foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)})
+ {
+ Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
+ Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
+ }
+ }
+
+ /// <summary>
+ /// Tests that keys are located properly in cache partitions.
+ /// </summary>
+ [Test]
+ public void TestKeyLocation()
+ {
+ TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey});
+ TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey});
+ TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey});
+ }
+
+ /// <summary>
+ /// Tests the <see cref="AffinityKey"/> class.
+ /// </summary>
+ [Test]
+ public void TestAffinityKeyClass()
+ {
+ // Check location
+ TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey));
+
+ // Check meta
+ Assert.AreEqual("affKey",
+ _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName);
+ }
+
+ /// <summary>
+ /// Tests <see cref="AffinityKey"/> class interoperability.
+ /// </summary>
+ [Test]
+ public void TestInterop()
+ {
+ var affKey = _cache1.Ignite.GetCompute()
+ .ExecuteJavaTask<AffinityKey>(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey);
+
+ Assert.AreEqual("interopAffinityKey", affKey.Key);
+ }
+
+ /// <summary>
+ /// Tests the key location.
+ /// </summary>
+ private void TestKeyLocation0<T>(Func<int, int, T> ctor)
+ {
+ var aff = _cache1.Ignite.GetAffinity(_cache1.Name);
+
+ foreach (var cache in new[] { _cache1, _cache2 })
+ {
+ cache.RemoveAll();
+
+ var localNode = cache.Ignite.GetCluster().GetLocalNode();
+
+ var localKeys = Enumerable.Range(1, int.MaxValue)
+ .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray();
+
+ for (int index = 0; index < localKeys.Length; index++)
+ {
+ var cacheKey = ctor(index, localKeys[index]);
+
+ cache.Put(cacheKey, index.ToString());
+
+ // Verify that key is stored locally according to AffinityKeyFieldName
+ Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary));
+
+ // Other cache does not have this key locally
+ var otherCache = cache == _cache1 ? _cache2 : _cache1;
+ Assert.Throws<KeyNotFoundException>(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All));
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets the configuration.
+ /// </summary>
+ private static IgniteConfiguration GetConfig(string gridName = null)
+ {
+ return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ GridName = gridName,
+ BinaryConfiguration = new BinaryConfiguration
+ {
+ TypeConfigurations = new[]
+ {
+ new BinaryTypeConfiguration(typeof (CacheKey))
+ {
+ AffinityKeyFieldName = "AffinityKey"
+ },
+ new BinaryTypeConfiguration(typeof(CacheKeyAttr)),
+ new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride))
+ {
+ AffinityKeyFieldName = "AffinityKey"
+ }
+ }
+ },
+ };
+ }
+
+ private class CacheKey
+ {
+ public int Key { get; set; }
+ public int AffinityKey { get; set; }
+ }
+
+ private class CacheKeyAttr
+ {
+ public int Key { get; set; }
+ [AffinityKeyMapped] public int AffinityKey { get; set; }
+ }
+
+ private class CacheKeyAttrOverride
+ {
+ [AffinityKeyMapped] public int Key { get; set; }
+ public int AffinityKey { get; set; }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
new file mode 100644
index 0000000..70e0d78
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
@@ -0,0 +1,282 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Affinity;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Resource;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests user-defined <see cref="IAffinityFunction"/>
+ /// </summary>
+ public class AffinityFunctionTest
+ {
+ /** */
+ private IIgnite _ignite;
+
+ /** */
+ private IIgnite _ignite2;
+
+ /** */
+ private const string CacheName = "cache";
+
+ /** */
+ private const int PartitionCount = 10;
+
+ /** */
+ private static readonly ConcurrentBag<Guid> RemovedNodes = new ConcurrentBag<Guid>();
+
+ /** */
+ private static readonly ConcurrentBag<AffinityFunctionContext> Contexts =
+ new ConcurrentBag<AffinityFunctionContext>();
+
+ /// <summary>
+ /// Fixture set up.
+ /// </summary>
+ [TestFixtureSetUp]
+ public void FixtureSetUp()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ CacheConfiguration = new[]
+ {
+ new CacheConfiguration(CacheName)
+ {
+ AffinityFunction = new SimpleAffinityFunction(),
+ Backups = 7
+ }
+ }
+ };
+
+ _ignite = Ignition.Start(cfg);
+
+ _ignite2 = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration()) {GridName = "grid2"});
+ }
+
+ /// <summary>
+ /// Fixture tear down.
+ /// </summary>
+ [TestFixtureTearDown]
+ public void FixtureTearDown()
+ {
+ // Check that affinity handles are present
+ TestUtils.AssertHandleRegistryHasItems(_ignite, _ignite.GetCacheNames().Count, 0);
+ TestUtils.AssertHandleRegistryHasItems(_ignite2, _ignite.GetCacheNames().Count, 0);
+
+ // Destroy all caches
+ _ignite.GetCacheNames().ToList().ForEach(_ignite.DestroyCache);
+ Assert.AreEqual(0, _ignite.GetCacheNames().Count);
+
+ // Check that all affinity functions got released
+ TestUtils.AssertHandleRegistryIsEmpty(1000, _ignite, _ignite2);
+
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Tests the static cache.
+ /// </summary>
+ [Test]
+ public void TestStaticCache()
+ {
+ VerifyCacheAffinity(_ignite.GetCache<int, int>(CacheName));
+ VerifyCacheAffinity(_ignite2.GetCache<int, int>(CacheName));
+ }
+
+ /// <summary>
+ /// Tests the dynamic cache.
+ /// </summary>
+ [Test]
+ public void TestDynamicCache()
+ {
+ const string cacheName = "dynCache";
+
+ VerifyCacheAffinity(_ignite.CreateCache<int, int>(new CacheConfiguration(cacheName)
+ {
+ AffinityFunction = new SimpleAffinityFunction(),
+ Backups = 5
+ }));
+
+ VerifyCacheAffinity(_ignite2.GetCache<int, int>(cacheName));
+
+ // Verify context for new cache
+ var lastCtx = Contexts.Where(x => x.GetPreviousAssignment(1) == null)
+ .OrderBy(x => x.DiscoveryEvent.Timestamp).Last();
+
+ Assert.AreEqual(new AffinityTopologyVersion(2, 1), lastCtx.CurrentTopologyVersion);
+ Assert.AreEqual(5, lastCtx.Backups);
+
+ // Verify context for old cache
+ var ctx = Contexts.Where(x => x.GetPreviousAssignment(1) != null)
+ .OrderBy(x => x.DiscoveryEvent.Timestamp).Last();
+
+ Assert.AreEqual(new AffinityTopologyVersion(2, 0), ctx.CurrentTopologyVersion);
+ Assert.AreEqual(7, ctx.Backups);
+ CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), ctx.CurrentTopologySnapshot);
+
+ var evt = ctx.DiscoveryEvent;
+ CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), evt.TopologyNodes);
+ CollectionAssert.Contains(_ignite.GetCluster().GetNodes(), evt.EventNode);
+ Assert.AreEqual(_ignite.GetCluster().TopologyVersion, evt.TopologyVersion);
+
+ var firstTop = _ignite.GetCluster().GetTopology(1);
+ var parts = Enumerable.Range(0, PartitionCount).ToArray();
+ CollectionAssert.AreEqual(parts.Select(x => firstTop), parts.Select(x => ctx.GetPreviousAssignment(x)));
+ }
+
+ /// <summary>
+ /// Verifies the cache affinity.
+ /// </summary>
+ private static void VerifyCacheAffinity(ICache<int, int> cache)
+ {
+ Assert.IsInstanceOf<SimpleAffinityFunction>(cache.GetConfiguration().AffinityFunction);
+
+ var aff = cache.Ignite.GetAffinity(cache.Name);
+ Assert.AreEqual(PartitionCount, aff.Partitions);
+
+ for (int i = 0; i < 100; i++)
+ Assert.AreEqual(i % PartitionCount, aff.GetPartition(i));
+ }
+
+ /// <summary>
+ /// Tests the RemoveNode method.
+ /// </summary>
+ [Test]
+ public void TestRemoveNode()
+ {
+ Assert.AreEqual(0, RemovedNodes.Count);
+
+ Guid expectedNodeId;
+
+ using (var ignite = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ GridName = "grid3",
+ }))
+ {
+ expectedNodeId = ignite.GetCluster().GetLocalNode().Id;
+ Assert.AreEqual(0, RemovedNodes.Count);
+ VerifyCacheAffinity(ignite.GetCache<int, int>(CacheName));
+ }
+
+ // Called on both nodes
+ TestUtils.WaitForCondition(() => RemovedNodes.Count > 0, 3000);
+ Assert.AreEqual(expectedNodeId, RemovedNodes.Distinct().Single());
+ }
+
+ /// <summary>
+ /// Tests the error on non-serializable function.
+ /// </summary>
+ [Test]
+ public void TestNonSerializableFunction()
+ {
+ var ex = Assert.Throws<IgniteException>(() =>
+ _ignite.CreateCache<int, int>(new CacheConfiguration("failCache")
+ {
+ AffinityFunction = new NonSerializableAffinityFunction()
+ }));
+
+ Assert.AreEqual(ex.Message, "AffinityFunction should be serializable.");
+ }
+
+ /// <summary>
+ /// Tests the exception propagation.
+ /// </summary>
+ [Test]
+ public void TestExceptionInFunction()
+ {
+ var cache = _ignite.CreateCache<int, int>(new CacheConfiguration("failCache2")
+ {
+ AffinityFunction = new FailInGetPartitionAffinityFunction()
+ });
+
+ var ex = Assert.Throws<CacheException>(() => cache.Put(1, 2));
+ Assert.AreEqual("User error", ex.InnerException.Message);
+ }
+
+ [Serializable]
+ private class SimpleAffinityFunction : IAffinityFunction
+ {
+ #pragma warning disable 649 // field is never assigned
+ [InstanceResource] private readonly IIgnite _ignite;
+
+ public int Partitions
+ {
+ get { return PartitionCount; }
+ }
+
+ public int GetPartition(object key)
+ {
+ Assert.IsNotNull(_ignite);
+
+ return (int) key % Partitions;
+ }
+
+ public void RemoveNode(Guid nodeId)
+ {
+ RemovedNodes.Add(nodeId);
+ }
+
+ public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ Assert.IsNotNull(_ignite);
+
+ Contexts.Add(context);
+
+ // All partitions are the same
+ return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot);
+ }
+ }
+
+ private class NonSerializableAffinityFunction : SimpleAffinityFunction
+ {
+ // No-op.
+ }
+
+ [Serializable]
+ private class FailInGetPartitionAffinityFunction : IAffinityFunction
+ {
+ public int Partitions
+ {
+ get { return 5; }
+ }
+
+ public int GetPartition(object key)
+ {
+ throw new ArithmeticException("User error");
+ }
+
+ public void RemoveNode(Guid nodeId)
+ {
+ // No-op.
+ }
+
+ public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
new file mode 100644
index 0000000..e38668b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cluster;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Affinity key tests.
+ /// </summary>
+ public sealed class AffinityTest
+ {
+ /// <summary>
+ ///
+ /// </summary>
+ [TestFixtureSetUp]
+ public void StartGrids()
+ {
+ TestUtils.KillProcesses();
+
+ for (int i = 0; i < 3; i++)
+ {
+ var cfg = new IgniteConfiguration
+ {
+ JvmClasspath = TestUtils.CreateTestClasspath(),
+ JvmOptions = TestUtils.TestJavaOptions(),
+ SpringConfigUrl = "config\\native-client-test-cache-affinity.xml",
+ GridName = "grid-" + i
+ };
+
+ Ignition.Start(cfg);
+ }
+ }
+
+ /// <summary>
+ /// Tear-down routine.
+ /// </summary>
+ [TestFixtureTearDown]
+ public void StopGrids()
+ {
+ Ignition.StopAll(true);
+ }
+
+ /// <summary>
+ /// Test affinity key.
+ /// </summary>
+ [Test]
+ public void TestAffinity()
+ {
+ IIgnite g = Ignition.GetIgnite("grid-0");
+
+ ICacheAffinity aff = g.GetAffinity(null);
+
+ IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1));
+
+ for (int i = 0; i < 10; i++)
+ Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id);
+ }
+
+ /// <summary>
+ /// Test affinity with binary flag.
+ /// </summary>
+ [Test]
+ public void TestAffinityBinary()
+ {
+ IIgnite g = Ignition.GetIgnite("grid-0");
+
+ ICacheAffinity aff = g.GetAffinity(null);
+
+ IBinaryObject affKey = g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(0, 1));
+
+ IClusterNode node = aff.MapKeyToNode(affKey);
+
+ for (int i = 0; i < 10; i++)
+ {
+ IBinaryObject otherAffKey =
+ g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(i, 1));
+
+ Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id);
+ }
+ }
+
+ /// <summary>
+ /// Affinity key.
+ /// </summary>
+ public class AffinityTestKey
+ {
+ /** ID. */
+ private readonly int _id;
+
+ /** Affinity key. */
+ // ReSharper disable once NotAccessedField.Local
+ private readonly int _affKey;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="id">ID.</param>
+ /// <param name="affKey">Affinity key.</param>
+ public AffinityTestKey(int id, int affKey)
+ {
+ _id = id;
+ _affKey = affKey;
+ }
+
+ /** <inheritdoc /> */
+ public override bool Equals(object obj)
+ {
+ var other = obj as AffinityTestKey;
+
+ return other != null && _id == other._id;
+ }
+
+ /** <inheritdoc /> */
+ public override int GetHashCode()
+ {
+ return _id;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
deleted file mode 100644
index 4fb7738..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
+++ /dev/null
@@ -1,199 +0,0 @@
-\ufeff/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// ReSharper disable UnusedAutoPropertyAccessor.Local
-namespace Apache.Ignite.Core.Tests.Cache
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using Apache.Ignite.Core.Binary;
- using Apache.Ignite.Core.Cache;
- using Apache.Ignite.Core.Cache.Affinity;
- using Apache.Ignite.Core.Cache.Configuration;
- using Apache.Ignite.Core.Tests.Compute;
- using NUnit.Framework;
-
- /// <summary>
- /// Tests custom affinity mapping.
- /// </summary>
- public class CacheAffinityFieldTest
- {
- /** */
- private ICache<object, string> _cache1;
-
- /** */
- private ICache<object, string> _cache2;
-
- /// <summary>
- /// Fixture set up.
- /// </summary>
- [TestFixtureSetUp]
- public void FixtureSetUp()
- {
- var grid1 = Ignition.Start(GetConfig());
- var grid2 = Ignition.Start(GetConfig("grid2"));
-
- _cache1 = grid1.CreateCache<object, string>(new CacheConfiguration
- {
- CacheMode = CacheMode.Partitioned
- });
- _cache2 = grid2.GetCache<object, string>(null);
- }
-
- /// <summary>
- /// Fixture tear down.
- /// </summary>
- [TestFixtureTearDown]
- public void FixtureTearDown()
- {
- Ignition.StopAll(true);
- }
-
- /// <summary>
- /// Tests the metadata.
- /// </summary>
- [Test]
- public void TestMetadata()
- {
- // Put keys to update meta
- _cache1.Put(new CacheKey(), string.Empty);
- _cache1.Put(new CacheKeyAttr(), string.Empty);
- _cache1.Put(new CacheKeyAttrOverride(), string.Empty);
-
- // Verify
- foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)})
- {
- Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
- Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
- }
- }
-
- /// <summary>
- /// Tests that keys are located properly in cache partitions.
- /// </summary>
- [Test]
- public void TestKeyLocation()
- {
- TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey});
- TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey});
- TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey});
- }
-
- /// <summary>
- /// Tests the <see cref="AffinityKey"/> class.
- /// </summary>
- [Test]
- public void TestAffinityKeyClass()
- {
- // Check location
- TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey));
-
- // Check meta
- Assert.AreEqual("affKey",
- _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName);
- }
-
- /// <summary>
- /// Tests <see cref="AffinityKey"/> class interoperability.
- /// </summary>
- [Test]
- public void TestInterop()
- {
- var affKey = _cache1.Ignite.GetCompute()
- .ExecuteJavaTask<AffinityKey>(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey);
-
- Assert.AreEqual("interopAffinityKey", affKey.Key);
- }
-
- /// <summary>
- /// Tests the key location.
- /// </summary>
- private void TestKeyLocation0<T>(Func<int, int, T> ctor)
- {
- var aff = _cache1.Ignite.GetAffinity(_cache1.Name);
-
- foreach (var cache in new[] { _cache1, _cache2 })
- {
- cache.RemoveAll();
-
- var localNode = cache.Ignite.GetCluster().GetLocalNode();
-
- var localKeys = Enumerable.Range(1, int.MaxValue)
- .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray();
-
- for (int index = 0; index < localKeys.Length; index++)
- {
- var cacheKey = ctor(index, localKeys[index]);
-
- cache.Put(cacheKey, index.ToString());
-
- // Verify that key is stored locally accroding to AffinityKeyFieldName
- Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary));
-
- // Other cache does not have this key locally
- var otherCache = cache == _cache1 ? _cache2 : _cache1;
- Assert.Throws<KeyNotFoundException>(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All));
- }
- }
- }
-
- /// <summary>
- /// Gets the configuration.
- /// </summary>
- private static IgniteConfiguration GetConfig(string gridName = null)
- {
- return new IgniteConfiguration(TestUtils.GetTestConfiguration())
- {
- GridName = gridName,
- BinaryConfiguration = new BinaryConfiguration
- {
- TypeConfigurations = new[]
- {
- new BinaryTypeConfiguration(typeof (CacheKey))
- {
- AffinityKeyFieldName = "AffinityKey"
- },
- new BinaryTypeConfiguration(typeof(CacheKeyAttr)),
- new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride))
- {
- AffinityKeyFieldName = "AffinityKey"
- }
- }
- },
- };
- }
-
- private class CacheKey
- {
- public int Key { get; set; }
- public int AffinityKey { get; set; }
- }
-
- private class CacheKeyAttr
- {
- public int Key { get; set; }
- [AffinityKeyMapped] public int AffinityKey { get; set; }
- }
-
- private class CacheKeyAttrOverride
- {
- [AffinityKeyMapped] public int Key { get; set; }
- public int AffinityKey { get; set; }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
deleted file mode 100644
index 689804c..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Tests.Cache
-{
- using Apache.Ignite.Core.Binary;
- using Apache.Ignite.Core.Cache;
- using Apache.Ignite.Core.Cluster;
- using Apache.Ignite.Core.Impl;
- using NUnit.Framework;
-
- /// <summary>
- /// Affinity key tests.
- /// </summary>
- public class CacheAffinityTest
- {
- /// <summary>
- ///
- /// </summary>
- [TestFixtureSetUp]
- public virtual void StartGrids()
- {
- TestUtils.KillProcesses();
-
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- cfg.JvmClasspath = TestUtils.CreateTestClasspath();
- cfg.JvmOptions = TestUtils.TestJavaOptions();
- cfg.SpringConfigUrl = "config\\native-client-test-cache-affinity.xml";
-
- for (int i = 0; i < 3; i++)
- {
- cfg.GridName = "grid-" + i;
-
- Ignition.Start(cfg);
- }
- }
-
- /// <summary>
- /// Tear-down routine.
- /// </summary>
- [TestFixtureTearDown]
- public virtual void StopGrids()
- {
- for (int i = 0; i < 3; i++)
- Ignition.Stop("grid-" + i, true);
- }
-
- /// <summary>
- /// Test affinity key.
- /// </summary>
- [Test]
- public void TestAffinity()
- {
- IIgnite g = Ignition.GetIgnite("grid-0");
-
- ICacheAffinity aff = g.GetAffinity(null);
-
- IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1));
-
- for (int i = 0; i < 10; i++)
- Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id);
- }
-
- /// <summary>
- /// Test affinity with binary flag.
- /// </summary>
- [Test]
- public void TestAffinityBinary()
- {
- IIgnite g = Ignition.GetIgnite("grid-0");
-
- ICacheAffinity aff = g.GetAffinity(null);
-
- IBinaryObject affKey = g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(0, 1));
-
- IClusterNode node = aff.MapKeyToNode(affKey);
-
- for (int i = 0; i < 10; i++)
- {
- IBinaryObject otherAffKey =
- g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(i, 1));
-
- Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id);
- }
- }
-
- /// <summary>
- /// Affinity key.
- /// </summary>
- public class AffinityTestKey
- {
- /** ID. */
- private int _id;
-
- /** Affinity key. */
- private int _affKey;
-
- /// <summary>
- /// Constructor.
- /// </summary>
- /// <param name="id">ID.</param>
- /// <param name="affKey">Affinity key.</param>
- public AffinityTestKey(int id, int affKey)
- {
- _id = id;
- _affKey = affKey;
- }
-
- /** <inheritdoc /> */
- public override bool Equals(object obj)
- {
- AffinityTestKey other = obj as AffinityTestKey;
-
- return other != null && _id == other._id;
- }
-
- /** <inheritdoc /> */
- public override int GetHashCode()
- {
- return _id;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
index eb73abe..da68ca2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
@@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Tests.Cache
var py = (AffinityFunctionBase) y;
Assert.AreEqual(px.GetType(), py.GetType());
- Assert.AreEqual(px.PartitionCount, py.PartitionCount);
+ Assert.AreEqual(px.Partitions, py.Partitions);
Assert.AreEqual(px.ExcludeNeighbors, py.ExcludeNeighbors);
}
@@ -552,7 +552,7 @@ namespace Apache.Ignite.Core.Tests.Cache
},
AffinityFunction = new RendezvousAffinityFunction
{
- PartitionCount = 513,
+ Partitions = 513,
ExcludeNeighbors = true
}
};
@@ -645,7 +645,7 @@ namespace Apache.Ignite.Core.Tests.Cache
},
AffinityFunction = new FairAffinityFunction
{
- PartitionCount = 113,
+ Partitions = 113,
ExcludeNeighbors = false
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
index 6fe3e70..9c7bfb0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
@@ -34,7 +34,7 @@
<list>
<bean class="org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration">
<property name="typeName"
- value="Apache.Ignite.Core.Tests.Cache.CacheAffinityTest+AffinityTestKey"/>
+ value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityTest+AffinityTestKey"/>
<property name="affinityKeyFieldName" value="_affKey"/>
</bean>
</list>
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index 3056273..e435cf6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -97,7 +97,7 @@ namespace Apache.Ignite.Core.Tests
<nearConfiguration nearStartSize='7'>
<evictionPolicy type='FifoEvictionPolicy' batchSize='10' maxSize='20' maxMemorySize='30' />
</nearConfiguration>
- <affinityFunction type='RendezvousAffinityFunction' partitionCount='99' excludeNeighbors='true' />
+ <affinityFunction type='RendezvousAffinityFunction' partitions='99' excludeNeighbors='true' />
</cacheConfiguration>
<cacheConfiguration name='secondCache' />
</cacheConfiguration>
@@ -172,7 +172,7 @@ namespace Apache.Ignite.Core.Tests
var af = cacheCfg.AffinityFunction as RendezvousAffinityFunction;
Assert.IsNotNull(af);
- Assert.AreEqual(99, af.PartitionCount);
+ Assert.AreEqual(99, af.Partitions);
Assert.IsTrue(af.ExcludeNeighbors);
Assert.AreEqual(new Dictionary<string, object> {{"myNode", "true"}}, cfg.UserAttributes);
@@ -365,6 +365,14 @@ namespace Apache.Ignite.Core.Tests
IdMapper = new IdMapper(),
NameMapper = new NameMapper(),
Serializer = new TestSerializer()
+ },
+ new BinaryTypeConfiguration
+ {
+ IsEnum = false,
+ KeepDeserialized = false,
+ AffinityKeyFieldName = "affKeyFieldName",
+ TypeName = "typeName2",
+ Serializer = new BinaryReflectiveSerializer()
}
},
Types = new[] {typeof (string).FullName},
@@ -448,7 +456,7 @@ namespace Apache.Ignite.Core.Tests
AffinityFunction = new FairAffinityFunction
{
ExcludeNeighbors = true,
- PartitionCount = 48
+ Partitions = 48
}
}
},
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
index c6274ff..726fa3b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
@@ -22,6 +22,8 @@ namespace Apache.Ignite.Core.Tests
using System.Linq;
using System.Reflection;
using Apache.Ignite.Core.Tests.Binary;
+ using Apache.Ignite.Core.Tests.Cache.Affinity;
+ using Apache.Ignite.Core.Tests.Cache.Query;
using Apache.Ignite.Core.Tests.Memory;
using NUnit.ConsoleRunner;
@@ -46,9 +48,9 @@ namespace Apache.Ignite.Core.Tests
return;
}
- TestOne(typeof(BinaryStringTest), "Test");
+ //TestOne(typeof(BinaryStringTest), "Test");
- //TestAll(typeof (CacheQueriesCodeConfigurationTest));
+ TestAll(typeof (AffinityFunctionTest));
//TestAllInAssembly();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 46dbd94..e7f772f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -84,6 +84,8 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Binary\BinaryReflectiveSerializer.cs" />
+ <Compile Include="Cache\Affinity\AffinityTopologyVersion.cs" />
+ <Compile Include="Cache\Affinity\AffinityFunctionContext.cs" />
<Compile Include="Impl\Binary\BinaryReflectiveSerializerInternal.cs" />
<Compile Include="Impl\Binary\IBinarySerializerInternal.cs" />
<Compile Include="Binary\Package-Info.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
index ea5b21c..9b89780 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
@@ -18,10 +18,13 @@
namespace Apache.Ignite.Core.Cache.Affinity
{
using System;
+ using System.Collections.Generic;
using System.ComponentModel;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache.Affinity.Fair;
using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
/// <summary>
/// Base class for predefined affinity functions.
@@ -37,14 +40,67 @@ namespace Apache.Ignite.Core.Cache.Affinity
/** */
private const byte TypeCodeRendezvous = 2;
- /// <summary> The default value for <see cref="PartitionCount"/> property. </summary>
- public const int DefaultPartitionCount = 1024;
+ /** */
+ private const byte TypeCodeUser = 3;
+
+ /// <summary> The default value for <see cref="Partitions"/> property. </summary>
+ public const int DefaultPartitions = 1024;
/// <summary>
/// Gets or sets the total number of partitions.
/// </summary>
- [DefaultValue(DefaultPartitionCount)]
- public int PartitionCount { get; set; }
+ [DefaultValue(DefaultPartitions)]
+ public int Partitions { get; set; }
+
+ /// <summary>
+ /// Gets partition number for a given key starting from 0. Partitioned caches
+ /// should make sure that keys are about evenly distributed across all partitions
+ /// from 0 to <see cref="Partitions" /> for best performance.
+ /// <para />
+ /// Note that for fully replicated caches it is possible to segment key sets among different
+ /// grid node groups. In that case each node group should return a unique partition
+ /// number. However, unlike partitioned cache, mappings of keys to nodes in
+ /// replicated caches are constant and a node cannot migrate from one partition
+ /// to another.
+ /// </summary>
+ /// <param name="key">Key to get partition for.</param>
+ /// <returns>
+ /// Partition number for a given key.
+ /// </returns>
+ public int GetPartition(object key)
+ {
+ throw GetDirectUsageError();
+ }
+
+ /// <summary>
+ /// Removes node from affinity. This method is called when it is safe to remove
+ /// disconnected node from affinity mapping.
+ /// </summary>
+ /// <param name="nodeId">The node identifier.</param>
+ public void RemoveNode(Guid nodeId)
+ {
+ throw GetDirectUsageError();
+ }
+
+ /// <summary>
+ /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+ /// nodes are updated in the same manner. In case of partitioned cache, the returned
+ /// list should contain only the primary and back up nodes with primary node being
+ /// always first.
+ /// <pare />
+ /// Note that partitioned affinity must obey the following contract: given that node
+ /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+ /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+ /// </summary>
+ /// <param name="context">The affinity function context.</param>
+ /// <returns>
+ /// A collection of partitions, where each partition is a collection of nodes,
+ /// where first node is a primary node, and other nodes are backup nodes.
+ /// </returns>
+ public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ throw GetDirectUsageError();
+ }
/// <summary>
/// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other.
@@ -56,7 +112,7 @@ namespace Apache.Ignite.Core.Cache.Affinity
/// </summary>
internal AffinityFunctionBase()
{
- PartitionCount = DefaultPartitionCount;
+ Partitions = DefaultPartitions;
}
/// <summary>
@@ -77,11 +133,16 @@ namespace Apache.Ignite.Core.Cache.Affinity
case TypeCodeRendezvous:
fun = new RendezvousAffinityFunction();
break;
+ case TypeCodeUser:
+ var f = reader.ReadObject<IAffinityFunction>();
+ reader.ReadInt(); // skip partition count
+
+ return f;
default:
throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode);
}
- fun.PartitionCount = reader.ReadInt();
+ fun.Partitions = reader.ReadInt();
fun.ExcludeNeighbors = reader.ReadBoolean();
return fun;
@@ -100,17 +161,30 @@ namespace Apache.Ignite.Core.Cache.Affinity
var p = fun as AffinityFunctionBase;
- if (p == null)
+ if (p != null)
{
- throw new NotSupportedException(
- string.Format("Unsupported AffinityFunction: {0}. Only predefined affinity function types " +
- "are supported: {1}, {2}", fun.GetType(), typeof(FairAffinityFunction),
- typeof(RendezvousAffinityFunction)));
+ writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
+ writer.WriteInt(p.Partitions);
+ writer.WriteBoolean(p.ExcludeNeighbors);
}
+ else
+ {
+ writer.WriteByte(TypeCodeUser);
+
+ if (!fun.GetType().IsSerializable)
+ throw new IgniteException("AffinityFunction should be serializable.");
- writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
- writer.WriteInt(p.PartitionCount);
- writer.WriteBoolean(p.ExcludeNeighbors);
+ writer.WriteObject(fun);
+ writer.WriteInt(fun.Partitions); // partition count is written once and can not be changed.
+ }
+ }
+
+ /// <summary>
+ /// Gets the direct usage error.
+ /// </summary>
+ private Exception GetDirectUsageError()
+ {
+ return new IgniteException(GetType() + " can not be used directly.");
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
new file mode 100644
index 0000000..1f44d8c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl;
+
+ /// <summary>
+ /// Affinity function context.
+ /// </summary>
+ public class AffinityFunctionContext
+ {
+ /** */
+ private readonly List<List<IClusterNode>> _previousAssignment;
+
+ /** */
+ private readonly int _backups;
+
+ /** */
+ private readonly ICollection<IClusterNode> _currentTopologySnapshot;
+
+ /** */
+ private readonly AffinityTopologyVersion _currentTopologyVersion;
+
+ /** */
+ private readonly DiscoveryEvent _discoveryEvent;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityFunctionContext"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ internal AffinityFunctionContext(IBinaryRawReader reader)
+ {
+ var cnt = reader.ReadInt();
+
+ if (cnt > 0)
+ {
+ _previousAssignment = new List<List<IClusterNode>>(cnt);
+
+ for (var i = 0; i < cnt; i++)
+ _previousAssignment.Add(IgniteUtils.ReadNodes(reader));
+ }
+
+ _backups = reader.ReadInt();
+ _currentTopologySnapshot = IgniteUtils.ReadNodes(reader);
+ _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt());
+ _discoveryEvent = EventReader.Read<DiscoveryEvent>(reader);
+ }
+
+ /// <summary>
+ /// Gets the affinity assignment for given partition on previous topology version.
+ /// First node in returned list is a primary node, other nodes are backups.
+ /// </summary>
+ /// <param name="partition">The partition to get previous assignment for.</param>
+ /// <returns>
+ /// List of nodes assigned to a given partition on previous topology version or <code>null</code>
+ /// if this information is not available.
+ /// </returns>
+ public ICollection<IClusterNode> GetPreviousAssignment(int partition)
+ {
+ return _previousAssignment == null ? null : _previousAssignment[partition];
+ }
+
+ /// <summary>
+ /// Gets number of backups for new assignment.
+ /// </summary>
+ public int Backups
+ {
+ get { return _backups; }
+ }
+
+ /// <summary>
+ /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular
+ /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order
+ /// on all nodes on which partition assignment is performed.
+ /// </summary>
+ public ICollection<IClusterNode> CurrentTopologySnapshot
+ {
+ get { return _currentTopologySnapshot; }
+ }
+
+ /// <summary>
+ /// Gets the current topology version.
+ /// </summary>
+ public AffinityTopologyVersion CurrentTopologyVersion
+ {
+ get { return _currentTopologyVersion; }
+ }
+
+ /// <summary>
+ /// Gets the discovery event that caused the topology change.
+ /// </summary>
+ public DiscoveryEvent DiscoveryEvent
+ {
+ get { return _discoveryEvent; }
+ }
+ }
+}
\ No newline at end of file