You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:39:57 UTC
[35/50] [abbrv] ignite git commit: GG-11293: .NET: Backported
affinity functions feature to 7.5.30.
GG-11293: .NET: Backported affinity functions feature to 7.5.30.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f57cc8d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f57cc8d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f57cc8d
Branch: refs/heads/ignite-1.5.31-1
Commit: 5f57cc8d703f9e8f749c9e3c403781365642dc3a
Parents: 78d7c13
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jul 19 14:34:35 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jul 19 14:34:35 2016 +0300
----------------------------------------------------------------------
.../internal/binary/GridBinaryMarshaller.java | 11 +-
.../GridAffinityFunctionContextImpl.java | 9 +
.../processors/cache/GridCacheProcessor.java | 65 +++--
.../affinity/PlatformAffinityFunction.java | 277 +++++++++++++++++++
.../PlatformAffinityFunctionTarget.java | 113 ++++++++
.../cache/affinity/PlatformAffinityUtils.java | 116 ++++++++
.../callback/PlatformCallbackGateway.java | 89 ++++++
.../callback/PlatformCallbackUtils.java | 49 ++++
.../PlatformDotNetConfigurationClosure.java | 115 +++++++-
.../dotnet/PlatformDotNetAffinityFunction.java | 171 ++++++++++++
.../cpp/common/include/ignite/common/java.h | 18 ++
modules/platforms/cpp/common/src/java.cpp | 36 ++-
.../Apache.Ignite.Core.Tests.csproj | 9 +-
.../Affinity/AffinityFunctionSpringTest.cs | 184 ++++++++++++
.../Config/Cache/Affinity/affinity-function.xml | 129 +++++++++
.../Cache/Affinity/affinity-function2.xml | 49 ++++
.../Apache.Ignite.Core.Tests/TestRunner.cs | 3 +-
.../Apache.Ignite.Core.csproj | 11 +-
.../Cache/Affinity/AffinityFunctionBase.cs | 139 ++++++++++
.../Cache/Affinity/AffinityFunctionContext.cs | 120 ++++++++
.../Cache/Affinity/AffinityTopologyVersion.cs | 138 +++++++++
.../Cache/Affinity/Fair/FairAffinityFunction.cs | 32 +++
.../Cache/Affinity/IAffinityFunction.cs | 82 ++++++
.../Rendezvous/RendezvousAffinityFunction.cs | 31 +++
.../Apache.Ignite.Core/Events/EventReader.cs | 8 +-
.../dotnet/Apache.Ignite.Core/Ignition.cs | 38 ++-
.../Impl/Binary/BinaryReaderExtensions.cs | 14 +
.../Impl/Binary/Marshaller.cs | 6 +-
.../Affinity/AffinityFunctionSerializer.cs | 277 +++++++++++++++++++
.../Cache/Affinity/PlatformAffinityFunction.cs | 74 +++++
.../Impl/Common/ObjectInfoHolder.cs | 86 ++++++
.../Apache.Ignite.Core/Impl/IgniteUtils.cs | 10 +-
.../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 6 +
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 133 ++++++++-
34 files changed, 2596 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
index 535207c..3a3dfd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
@@ -305,6 +305,15 @@ public class GridBinaryMarshaller {
/**
* Push binary context and return the old one.
*
+ * @return Old binary context.
+ */
+ public BinaryContext pushContext() {
+ return pushContext(ctx);
+ }
+
+ /**
+ * Push binary context and return the old one.
+ *
* @param ctx Binary context.
* @return Old binary context.
*/
@@ -321,7 +330,7 @@ public class GridBinaryMarshaller {
*
* @param oldCtx Old binary context.
*/
- private static void popContext(@Nullable BinaryContext oldCtx) {
+ public static void popContext(@Nullable BinaryContext oldCtx) {
if (oldCtx == null)
BINARY_CTX.remove();
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
index 6c97efd..4ddee00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
@@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext
@Override public int backups() {
return backups;
}
+
+ /**
+ * Gets the previous assignment.
+ *
+ * @return Previous assignment.
+ */
+ public List<List<ClusterNode>> prevAssignment() {
+ return prevAssignment;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6484d4d..6761fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -69,10 +69,13 @@ import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgniteTransactionsEx;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -94,12 +97,14 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -3379,32 +3384,60 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If validation failed.
* @return Configuration copy.
*/
- private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException {
+ private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
if (val == null)
return null;
- if (val.getCacheStoreFactory() != null) {
- try {
- ClassLoader ldr = ctx.config().getClassLoader();
+ return withBinaryContext(new IgniteOutClosureX<CacheConfiguration>() {
+ @Override public CacheConfiguration applyx() throws IgniteCheckedException {
+ if (val.getCacheStoreFactory() != null) {
+ try {
+ ClassLoader ldr = ctx.config().getClassLoader();
- if (ldr == null)
- ldr = val.getCacheStoreFactory().getClass().getClassLoader();
+ if (ldr == null)
+ ldr = val.getCacheStoreFactory().getClass().getClassLoader();
- marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
- U.resolveClassLoader(ldr, ctx.config()));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to validate cache configuration. " +
- "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
+ marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
+ U.resolveClassLoader(ldr, ctx.config()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to validate cache configuration. " +
+ "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
+ }
+ }
+
+ try {
+ return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteCheckedException("Failed to validate cache configuration " +
+ "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
+ }
}
+ });
+ }
+
+ /**
+ * @param c Closure.
+ * @return Closure result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException {
+ IgniteCacheObjectProcessor objProc = ctx.cacheObjects();
+ BinaryContext oldCtx = null;
+
+ if (objProc instanceof CacheObjectBinaryProcessorImpl) {
+ GridBinaryMarshaller binMarsh = ((CacheObjectBinaryProcessorImpl)objProc).marshaller();
+
+ oldCtx = binMarsh == null ? null : binMarsh.pushContext();
}
try {
- return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config()));
+ return c.applyx();
}
- catch (IgniteCheckedException e) {
- throw new IgniteCheckedException("Failed to validate cache configuration " +
- "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
+ finally {
+ if (objProc instanceof CacheObjectBinaryProcessorImpl)
+ GridBinaryMarshaller.popContext(oldCtx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
new file mode 100644
index 0000000..6681e7a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Platform AffinityFunction.
+ */
+public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final byte FLAG_PARTITION = 1;
+
+ /** */
+ private static final byte FLAG_REMOVE_NODE = 1 << 1;
+
+ /** */
+ private static final byte FLAG_ASSIGN_PARTITIONS = 1 << 2;
+
+ /** */
+ private Object userFunc;
+
+ /**
+ * Partition count.
+ *
+ * 1) Java calls partitions() method very early (before LifecycleAware.start) during CacheConfiguration validation.
+ * 2) Partition count never changes.
+ * Therefore, we get the value on .NET side once, and pass it along with PlatformAffinity.
+ */
+ private int partitions;
+
+ /** */
+ private AffinityFunction baseFunc;
+
+ /** */
+ private byte overrideFlags;
+
+ /** */
+ private transient Ignite ignite;
+
+ /** */
+ private transient PlatformContext ctx;
+
+ /** */
+ private transient long ptr;
+
+ /** */
+ private transient PlatformAffinityFunctionTarget baseTarget;
+
+
+ /**
+ * Ctor for serialization.
+ *
+ */
+ public PlatformAffinityFunction() {
+ partitions = -1;
+ }
+
+ /**
+ * Ctor.
+ *
+ * @param func User fun object.
+ * @param partitions Number of partitions.
+ */
+ public PlatformAffinityFunction(Object func, int partitions, byte overrideFlags, AffinityFunction baseFunc) {
+ userFunc = func;
+ this.partitions = partitions;
+ this.overrideFlags = overrideFlags;
+ this.baseFunc = baseFunc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // userFunc is always in initial state (it is serialized only once on start).
+ if (baseFunc != null)
+ baseFunc.reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ // Affinity function can not return different number of partitions,
+ // so we pass this value once from the platform.
+ assert partitions > 0;
+
+ return partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ if ((overrideFlags & FLAG_PARTITION) == 0) {
+ assert baseFunc != null;
+
+ return baseFunc.partition(key);
+ }
+
+ assert ctx != null;
+ assert ptr != 0;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(key);
+
+ out.synchronize();
+
+ return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ if ((overrideFlags & FLAG_ASSIGN_PARTITIONS) == 0) {
+ assert baseFunc != null;
+
+ return baseFunc.assignPartitions(affCtx);
+ }
+
+ assert ctx != null;
+ assert ptr != 0;
+ assert affCtx != null;
+
+ try (PlatformMemory outMem = ctx.memory().allocate()) {
+ try (PlatformMemory inMem = ctx.memory().allocate()) {
+ PlatformOutputStream out = outMem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ // Write previous assignment
+ PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
+
+ out.synchronize();
+
+ // Call platform
+ // We can not restore original AffinityFunctionContext after the call to platform,
+ // due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
+ // Secondly, AffinityFunctionContext can't be changed by the user.
+ if (baseTarget != null)
+ baseTarget.setCurrentAffinityFunctionContext(affCtx);
+
+ try {
+ ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer());
+ }
+ finally {
+ if (baseTarget != null)
+ baseTarget.setCurrentAffinityFunctionContext(null);
+ }
+
+ // Read result
+ return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ if ((overrideFlags & FLAG_REMOVE_NODE) == 0) {
+ assert baseFunc != null;
+
+ baseFunc.removeNode(nodeId);
+
+ return;
+ }
+
+ assert ctx != null;
+ assert ptr != 0;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeUuid(nodeId);
+
+ out.synchronize();
+
+ ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(userFunc);
+ out.writeInt(partitions);
+ out.writeByte(overrideFlags);
+ out.writeObject(baseFunc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ userFunc = in.readObject();
+ partitions = in.readInt();
+ overrideFlags = in.readByte();
+ baseFunc = (AffinityFunction)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ // userFunc is null when there is nothing overridden
+ if (userFunc == null)
+ return;
+
+ assert ignite != null;
+ ctx = PlatformUtils.platformContext(ignite);
+ assert ctx != null;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+ BinaryRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(userFunc);
+
+ out.synchronize();
+
+ baseTarget = baseFunc != null
+ ? new PlatformAffinityFunctionTarget(ctx, baseFunc)
+ : null;
+
+ ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (ptr == 0)
+ return;
+
+ assert ctx != null;
+
+ ctx.gateway().affinityFunctionDestroy(ptr);
+ }
+
+ /**
+ * Injects the Ignite.
+ *
+ * @param ignite Ignite.
+ */
+ @SuppressWarnings("unused")
+ @IgniteInstanceResource
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
new file mode 100644
index 0000000..8a07b33
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.List;
+
+/**
+ * Platform affinity function target:
+ * to be invoked when Platform function calls base implementation of one of the AffinityFunction methods.
+ */
+public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_PARTITION = 1;
+
+ /** */
+ private static final int OP_REMOVE_NODE = 2;
+
+ /** */
+ private static final int OP_ASSIGN_PARTITIONS = 3;
+
+ /** Inner function to delegate calls to. */
+ private final AffinityFunction baseFunc;
+
+ /** Thread local to hold the current affinity function context. */
+ private static final ThreadLocal<AffinityFunctionContext> currentAffCtx = new ThreadLocal<>();
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param baseFunc Function to wrap.
+ */
+ protected PlatformAffinityFunctionTarget(PlatformContext platformCtx, AffinityFunction baseFunc) {
+ super(platformCtx);
+
+ assert baseFunc != null;
+ this.baseFunc = baseFunc;
+
+ try {
+ platformCtx.kernalContext().resource().injectGeneric(baseFunc);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+ if (type == OP_PARTITION)
+ return baseFunc.partition(reader.readObjectDetached());
+ else if (type == OP_REMOVE_NODE) {
+ baseFunc.removeNode(reader.readUuid());
+
+ return 0;
+ }
+
+ return super.processInStreamOutLong(type, reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
+ if (type == OP_ASSIGN_PARTITIONS) {
+ AffinityFunctionContext affCtx = currentAffCtx.get();
+
+ if (affCtx == null)
+ throw new IgniteException("Thread-local AffinityFunctionContext is null. " +
+ "This may indicate an unsupported call to the base AffinityFunction.");
+
+ final List<List<ClusterNode>> partitions = baseFunc.assignPartitions(affCtx);
+
+ PlatformAffinityUtils.writePartitionAssignment(partitions, writer, platformContext());
+
+ return;
+ }
+
+ super.processOutStream(type, writer);
+ }
+
+ /**
+ * Sets the context for current operation.
+ *
+ * @param ctx Context.
+ */
+ void setCurrentAffinityFunctionContext(AffinityFunctionContext ctx) {
+ currentAffCtx.set(ctx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java
new file mode 100644
index 0000000..b1e1b23
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Affinity serialization functions.
+ */
+public class PlatformAffinityUtils {
+ /**
+ * Writes the affinity function context.
+ * @param affCtx Affinity context.
+ * @param writer Writer.
+ * @param ctx Platform context.
+ */
+ public static void writeAffinityFunctionContext(AffinityFunctionContext affCtx, BinaryRawWriterEx writer,
+ PlatformContext ctx) {
+ assert affCtx != null;
+ assert writer != null;
+ assert ctx != null;
+
+ ctx.writeNodes(writer, affCtx.currentTopologySnapshot());
+
+ writer.writeInt(affCtx.backups());
+ writer.writeLong(affCtx.currentTopologyVersion().topologyVersion());
+ writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion());
+
+ ctx.writeEvent(writer, affCtx.discoveryEvent());
+
+ // Write previous assignment
+ List<List<ClusterNode>> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment();
+
+ if (prevAssignment == null)
+ writer.writeInt(-1);
+ else {
+ writer.writeInt(prevAssignment.size());
+
+ for (List<ClusterNode> part : prevAssignment)
+ ctx.writeNodes(writer, part);
+ }
+ }
+
+ /**
+ * Writes the partition assignment to a stream.
+ *
+ * @param partitions Partitions.
+ * @param writer Writer.
+ */
+ public static void writePartitionAssignment(Collection<List<ClusterNode>> partitions, BinaryRawWriterEx writer,
+ PlatformContext ctx) {
+ assert partitions != null;
+ assert writer != null;
+
+ writer.writeInt(partitions.size());
+
+ for (List<ClusterNode> part : partitions)
+ ctx.writeNodes(writer, part);
+ }
+
+ /**
+ * Reads the partition assignment.
+ *
+ * @param reader Reader.
+ * @param ctx Platform context.
+ * @return Partitions.
+ */
+ public static List<List<ClusterNode>> readPartitionAssignment(BinaryRawReader reader, PlatformContext ctx) {
+ assert reader != null;
+ assert ctx != null;
+
+ int partCnt = reader.readInt();
+
+ List<List<ClusterNode>> res = new ArrayList<>(partCnt);
+
+ IgniteClusterEx cluster = ctx.kernalContext().grid().cluster();
+
+ for (int i = 0; i < partCnt; i++) {
+ int partSize = reader.readInt();
+
+ List<ClusterNode> part = new ArrayList<>(partSize);
+
+ for (int j = 0; j < partSize; j++)
+ part.add(cluster.node(reader.readUuid()));
+
+ res.add(part);
+ }
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index 47862a2..1759a5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
/**
@@ -920,6 +921,94 @@ public class PlatformCallbackGateway {
}
/**
+ * Initializes affinity function.
+ *
+ * @param memPtr Pointer to a stream with serialized affinity function.
+ * @param baseFunc Optional func for base calls.
+ * @return Affinity function pointer.
+ */
+ public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) {
+ enter();
+
+ try {
+ return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Gets the partition from affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with key object.
+ * @return Partition number for a given key.
+ */
+ public int affinityFunctionPartition(long ptr, long memPtr) {
+ enter();
+
+ try {
+ return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Assigns the affinity partitions.
+ *
+ * @param ptr Affinity function pointer.
+ * @param outMemPtr Pointer to a stream with affinity context.
+ * @param inMemPtr Pointer to a stream with result.
+ */
+ public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){
+ enter();
+
+ try {
+ PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Removes the node from affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with node id.
+ */
+ public void affinityFunctionRemoveNode(long ptr, long memPtr) {
+ enter();
+
+ try {
+ PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
+ * Destroys the affinity function.
+ *
+ * @param ptr Affinity function pointer.
+ */
+ public void affinityFunctionDestroy(long ptr) {
+ if (!lock.enterBusy())
+ return; // skip: destroy is not necessary during shutdown.
+
+ try {
+ PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /**
* Enter gateway.
*/
protected void enter() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 7f3ba6f..1cbbd7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.platform.callback;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget;
+
/**
* Platform callback utility methods. Implemented in target platform. All methods in this class must be
* package-visible and invoked only through {@link PlatformCallbackGateway}.
@@ -482,6 +484,53 @@ public class PlatformCallbackUtils {
static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2);
/**
+ * Initializes affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Pointer to a stream with serialized affinity function.
+ * @param baseFunc Optional func for base calls.
+ * @return Affinity function pointer.
+ */
+ static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc);
+
+ /**
+ * Gets the partition from affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with key object.
+ * @return Partition number for a given key.
+ */
+ static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr);
+
+ /**
+ * Assigns the affinity partitions.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param outMemPtr Pointer to a stream with affinity context.
+ * @param inMemPtr Pointer to a stream with result.
+ */
+ static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr);
+
+ /**
+ * Removes the node from affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ * @param memPtr Pointer to a stream with node id.
+ */
+ static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr);
+
+ /**
+ * Destroys the affinity function.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Affinity function pointer.
+ */
+ static native void affinityFunctionDestroy(long envPtr, long ptr);
+
+ /**
* Private constructor.
*/
private PlatformCallbackUtils() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
index 6b9b441..f441f4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java
@@ -19,21 +19,28 @@ package org.apache.ignite.internal.processors.platform.dotnet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.binary.BinaryIdMapper;
import org.apache.ignite.binary.BinaryBasicIdMapper;
-import org.apache.ignite.binary.BinaryNameMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
+import org.apache.ignite.binary.BinaryIdMapper;
+import org.apache.ignite.binary.BinaryNameMapper;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.PlatformConfiguration;
import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
-import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
import org.apache.ignite.internal.processors.platform.lifecycle.PlatformLifecycleBean;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
@@ -42,8 +49,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lifecycle.LifecycleBean;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction;
import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.platform.dotnet.PlatformDotNetLifecycleBean;
import java.util.ArrayList;
@@ -183,7 +190,9 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur
try (PlatformMemory inMem = memMgr.allocate()) {
PlatformOutputStream out = outMem.output();
- BinaryRawWriterEx writer = marshaller().writer(out);
+ final GridBinaryMarshaller marshaller = marshaller();
+
+ BinaryRawWriterEx writer = marshaller.writer(out);
PlatformUtils.writeDotNetConfiguration(writer, interopCfg.unwrap());
@@ -196,12 +205,24 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur
writer.writeMap(bean.getProperties());
}
+ // Write .NET affinity funcs
+ List<PlatformDotNetAffinityFunction> affFuncs = affinityFunctions(igniteCfg);
+
+ writer.writeInt(affFuncs.size());
+
+ for (PlatformDotNetAffinityFunction func : affFuncs) {
+ writer.writeString(func.getTypeName());
+ writer.writeMap(func.getProperties());
+ }
+
out.synchronize();
gate.extensionCallbackInLongLongOutLong(
PlatformUtils.OP_PREPARE_DOT_NET, outMem.pointer(), inMem.pointer());
- processPrepareResult(inMem.input());
+ BinaryReaderExImpl reader = new BinaryReaderExImpl(marshaller.context(), inMem.input(), null);
+
+ processPrepareResult(reader);
}
}
}
@@ -211,7 +232,7 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur
*
* @param in Input stream.
*/
- private void processPrepareResult(PlatformInputStream in) {
+ private void processPrepareResult(BinaryReaderExImpl in) {
assert cfg != null;
List<PlatformDotNetLifecycleBean> beans = beans(cfg);
@@ -245,6 +266,63 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur
cfg.setLifecycleBeans(mergedBeans);
}
}
+
+ // Process affinity functions
+ List<PlatformDotNetAffinityFunction> affFuncs = affinityFunctions(cfg);
+
+ if (!affFuncs.isEmpty()) {
+ for (PlatformDotNetAffinityFunction aff : affFuncs)
+ aff.init(readAffinityFunction(in));
+ }
+ }
+
+ /**
+ * Reads the affinity function.
+ *
+ * @param in Stream.
+ * @return Affinity function.
+ */
+ private static PlatformAffinityFunction readAffinityFunction(BinaryRawReaderEx in) {
+ byte plcTyp = in.readByte();
+
+ if (plcTyp == 0)
+ return null;
+
+ int partitions = in.readInt();
+ boolean exclNeighbours = in.readBoolean();
+ byte overrideFlags = in.readByte();
+ Object userFunc = in.readObjectDetached();
+
+ AffinityFunction baseFunc = null;
+
+ switch (plcTyp) {
+ case 1: {
+ FairAffinityFunction f = new FairAffinityFunction();
+
+ f.setPartitions(partitions);
+ f.setExcludeNeighbors(exclNeighbours);
+
+ baseFunc = f;
+
+ break;
+ }
+
+ case 2: {
+ RendezvousAffinityFunction f = new RendezvousAffinityFunction();
+
+ f.setPartitions(partitions);
+ f.setExcludeNeighbors(exclNeighbours);
+
+ baseFunc = f;
+
+ break;
+ }
+
+ default:
+ assert plcTyp == 3 : "Unknown affinity function policy type: " + plcTyp;
+ }
+
+ return new PlatformAffinityFunction(userFunc, partitions, overrideFlags, baseFunc);
}
/**
@@ -289,4 +367,25 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur
throw U.convertException(e);
}
}
+
+ /**
+ * Find .NET affinity functions in configuration.
+ *
+ * @param cfg Configuration.
+ * @return affinity functions.
+ */
+ private static List<PlatformDotNetAffinityFunction> affinityFunctions(IgniteConfiguration cfg) {
+ List<PlatformDotNetAffinityFunction> res = new ArrayList<>();
+
+ CacheConfiguration[] cacheCfg = cfg.getCacheConfiguration();
+
+ if (cacheCfg != null) {
+ for (CacheConfiguration ccfg : cacheCfg) {
+ if (ccfg.getAffinity() instanceof PlatformDotNetAffinityFunction)
+ res.add((PlatformDotNetAffinityFunction)ccfg.getAffinity());
+ }
+ }
+
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java
new file mode 100644
index 0000000..254c379
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform.dotnet;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * AffinityFunction implementation which can be used to configure .NET affinity function in Java Spring configuration.
+ */
+public class PlatformDotNetAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** .NET type name. */
+ private transient String typName;
+
+ /** Properties. */
+ private transient Map<String, ?> props;
+
+ /** Inner function. */
+ private PlatformAffinityFunction func;
+
+ /**
+ * Gets .NET type name.
+ *
+ * @return .NET type name.
+ */
+ public String getTypeName() {
+ return typName;
+ }
+
+ /**
+ * Sets .NET type name.
+ *
+ * @param typName .NET type name.
+ */
+ public void setTypeName(String typName) {
+ this.typName = typName;
+ }
+
+ /**
+ * Get properties.
+ *
+ * @return Properties.
+ */
+ public Map<String, ?> getProperties() {
+ return props;
+ }
+
+ /**
+ * Set properties.
+ *
+ * @param props Properties.
+ */
+ public void setProperties(Map<String, ?> props) {
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ assert func != null;
+
+ func.reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitions() {
+ assert func != null;
+
+ return func.partitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ assert func != null;
+
+ return func.partition(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+ assert func != null;
+
+ return func.assignPartitions(affCtx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeNode(UUID nodeId) {
+ assert func != null;
+
+ func.removeNode(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(func);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ func = (PlatformAffinityFunction) in.readObject();
+ }
+
+ /**
+ * Initializes this instance.
+ *
+ * @param func Underlying func.
+ */
+ public void init(PlatformAffinityFunction func) {
+ assert func != null;
+
+ this.func = func;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ assert func != null;
+
+ func.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ assert func != null;
+
+ func.stop();
+ }
+
+ /**
+ * Injects the Ignite.
+ *
+ * @param ignite Ignite.
+ */
+ @SuppressWarnings("unused")
+ @IgniteInstanceResource
+ private void setIgnite(Ignite ignite) {
+ assert func != null;
+
+ func.setIgnite(ignite);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/cpp/common/include/ignite/common/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h
index e629c77..91caddd 100644
--- a/modules/platforms/cpp/common/include/ignite/common/java.h
+++ b/modules/platforms/cpp/common/include/ignite/common/java.h
@@ -103,6 +103,12 @@ namespace ignite
typedef long long(JNICALL *ExtensionCallbackInLongOutLongHandler)(void* target, int typ, long long arg1);
typedef long long(JNICALL *ExtensionCallbackInLongLongOutLongHandler)(void* target, int typ, long long arg1, long long arg2);
+ typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr, void* baseFunc);
+ typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr);
+ typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr);
+ typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr);
+ typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr);
+
/**
* JNI handlers holder.
*/
@@ -177,6 +183,12 @@ namespace ignite
ExtensionCallbackInLongOutLongHandler extensionCallbackInLongOutLong;
ExtensionCallbackInLongLongOutLongHandler extensionCallbackInLongLongOutLong;
+
+ AffinityFunctionInitHandler affinityFunctionInit;
+ AffinityFunctionPartitionHandler affinityFunctionPartition;
+ AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions;
+ AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode;
+ AffinityFunctionDestroyHandler affinityFunctionDestroy;
};
/**
@@ -683,6 +695,12 @@ namespace ignite
JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1);
JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2);
+
+ JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc);
+ JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+ JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/cpp/common/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp
index 63deba5..789b6a3 100644
--- a/modules/platforms/cpp/common/src/java.cpp
+++ b/modules/platforms/cpp/common/src/java.cpp
@@ -346,6 +346,12 @@ namespace ignite
JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true);
JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG = JniMethod("extensionCallbackInLongLongOutLong", "(JIJJ)J", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true);
+ JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true);
+
const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils";
JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true);
JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true);
@@ -766,7 +772,7 @@ namespace ignite
void RegisterNatives(JNIEnv* env) {
{
- JNINativeMethod methods[52];
+ JNINativeMethod methods[57];
int idx = 0;
@@ -840,6 +846,12 @@ namespace ignite
AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG, reinterpret_cast<void*>(JniExtensionCallbackInLongOutLong));
AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG, reinterpret_cast<void*>(JniExtensionCallbackInLongLongOutLong));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast<void*>(JniAffinityFunctionInit));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast<void*>(JniAffinityFunctionPartition));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast<void*>(JniAffinityFunctionAssignPartitions));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast<void*>(JniAffinityFunctionRemoveNode));
+ AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast<void*>(JniAffinityFunctionDestroy));
+
jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx);
if (res != JNI_OK)
@@ -2471,6 +2483,26 @@ namespace ignite
JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2) {
IGNITE_SAFE_FUNC(env, envPtr, ExtensionCallbackInLongLongOutLongHandler, extensionCallbackInLongLongOutLong, typ, arg1, arg2);
}
- }
+
+ JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc) {
+ void* baseFuncRef = baseFunc ? env->NewGlobalRef(baseFunc) : nullptr;
+ IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr, baseFuncRef);
+ }
+
+ JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+ IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr);
+ }
+
+ JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) {
+ IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr);
+ } }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 0194450..89cd2a7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -60,6 +60,7 @@
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="Cache\Affinity\AffinityFunctionSpringTest.cs" />
<Compile Include="Cache\CacheDynamicStartTest.cs" />
<Compile Include="Cache\CacheTestAsyncWrapper.cs" />
<Compile Include="Cache\CacheAbstractTest.cs" />
@@ -175,6 +176,12 @@
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
<SubType>Designer</SubType>
</Content>
+ <Content Include="Config\Cache\Affinity\affinity-function2.xml">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </Content>
+ <Content Include="Config\Cache\Affinity\affinity-function.xml">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </Content>
<Content Include="Config\Cache\Store\cache-store-session.xml">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
@@ -262,4 +269,4 @@ copy /Y $(SolutionDir)Apache.Ignite\bin\$(PlatformName)\$(ConfigurationName)\Apa
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs
new file mode 100644
index 0000000..7b317ac
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs
@@ -0,0 +1,184 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable UnusedAutoPropertyAccessor.Local
+// ReSharper disable UnusedMember.Local
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Affinity;
+ using Apache.Ignite.Core.Cache.Affinity.Fair;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Resource;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests AffinityFunction defined in Spring XML.
+ /// </summary>
+ public class AffinityFunctionSpringTest : IgniteTestBase
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityFunctionSpringTest"/> class.
+ /// </summary>
+ public AffinityFunctionSpringTest() : base(6,
+ "config\\cache\\affinity\\affinity-function.xml",
+ "config\\cache\\affinity\\affinity-function2.xml")
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Tests the static cache.
+ /// </summary>
+ [Test]
+ public void TestStaticCache()
+ {
+ ValidateAffinityFunction(Grid.GetCache<long, int>("cache1"));
+ ValidateAffinityFunction(Grid2.GetCache<long, int>("cache1"));
+ ValidateAffinityFunction(Grid.GetCache<long, int>("cache2"));
+ ValidateAffinityFunction(Grid2.GetCache<long, int>("cache2"));
+ }
+
+ /// <summary>
+ /// Tests the dynamic cache.
+ /// </summary>
+ [Test]
+ public void TestDynamicCache()
+ {
+ ValidateAffinityFunction(Grid.CreateCache<long, int>("dyn-cache-1"));
+ ValidateAffinityFunction(Grid2.GetCache<long, int>("dyn-cache-1"));
+
+ ValidateAffinityFunction(Grid2.CreateCache<long, int>("dyn-cache-2"));
+ ValidateAffinityFunction(Grid.GetCache<long, int>("dyn-cache-2"));
+
+ ValidateAffinityFunction(Grid.CreateCache<long, int>("dyn-cache2-1"));
+ ValidateAffinityFunction(Grid2.GetCache<long, int>("dyn-cache2-1"));
+
+ ValidateAffinityFunction(Grid2.CreateCache<long, int>("dyn-cache2-2"));
+ ValidateAffinityFunction(Grid.GetCache<long, int>("dyn-cache2-2"));
+ }
+
+ /// <summary>
+ /// Validates the affinity function.
+ /// </summary>
+ /// <param name="cache">The cache.</param>
+ private static void ValidateAffinityFunction(ICache<long, int> cache)
+ {
+ var aff = cache.Ignite.GetAffinity(cache.Name);
+
+ Assert.AreEqual(5, aff.Partitions);
+
+ // Predefined map
+ Assert.AreEqual(2, aff.GetPartition(1L));
+ Assert.AreEqual(1, aff.GetPartition(2L));
+
+ // Other keys
+ Assert.AreEqual(1, aff.GetPartition(13L));
+ Assert.AreEqual(3, aff.GetPartition(4L));
+ }
+
+ private class TestFunc : IAffinityFunction // [Serializable] is not necessary
+ {
+ [InstanceResource]
+ private readonly IIgnite _ignite = null;
+
+ private int Property1 { get; set; }
+
+ private string Property2 { get; set; }
+
+ public int Partitions
+ {
+ get { return 5; }
+ }
+
+ public int GetPartition(object key)
+ {
+ Assert.IsNotNull(_ignite);
+ Assert.AreEqual(1, Property1);
+ Assert.AreEqual("1", Property2);
+
+ var longKey = (long)key;
+ int res;
+
+ if (TestFairFunc.PredefinedParts.TryGetValue(longKey, out res))
+ return res;
+
+ return (int)(longKey * 2 % 5);
+ }
+
+ // ReSharper disable once UnusedParameter.Local
+ public void RemoveNode(Guid nodeId)
+ {
+ // No-op.
+ }
+
+ public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot);
+ }
+ }
+
+ private class TestFairFunc : FairAffinityFunction // [Serializable] is not necessary
+ {
+ public static readonly Dictionary<long, int> PredefinedParts = new Dictionary<long, int>
+ {
+ {1, 2},
+ {2, 1}
+ };
+
+ [InstanceResource]
+ private readonly IIgnite _ignite = null;
+
+ private int Property1 { get; set; }
+
+ private string Property2 { get; set; }
+
+ public override int GetPartition(object key)
+ {
+ Assert.IsNotNull(_ignite);
+ Assert.AreEqual(1, Property1);
+ Assert.AreEqual("1", Property2);
+
+ Assert.IsInstanceOf<long>(key);
+
+ var basePart = base.GetPartition(key);
+ Assert.Greater(basePart, -1);
+ Assert.Less(basePart, Partitions);
+
+ var longKey = (long) key;
+ int res;
+
+ if (PredefinedParts.TryGetValue(longKey, out res))
+ return res;
+
+ return (int) (longKey * 2 % 5);
+ }
+
+ public override IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ var baseRes = base.AssignPartitions(context).ToList(); // test base call
+
+ Assert.AreEqual(Partitions, baseRes.Count);
+
+ return baseRes;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml
new file mode 100644
index 0000000..67ff128
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml
@@ -0,0 +1,129 @@
+\ufeff<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration">
+ <null/>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache1"/>
+
+ <property name="affinity">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFunc, Apache.Ignite.Core.Tests"/>
+ <property name="properties">
+ <map>
+ <entry key="Property1">
+ <value type="java.lang.Integer">1</value>
+ </entry>
+ <entry key="Property2" value="1"/>
+ </map>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="dyn-cache-*"/>
+
+ <property name="affinity">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFunc, Apache.Ignite.Core.Tests"/>
+ <property name="properties">
+ <map>
+ <entry key="Property1">
+ <value type="java.lang.Integer">1</value>
+ </entry>
+ <entry key="Property2" value="1"/>
+ </map>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache2"/>
+
+ <property name="affinity">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFairFunc, Apache.Ignite.Core.Tests"/>
+ <property name="properties">
+ <map>
+ <entry key="Property1">
+ <value type="java.lang.Integer">1</value>
+ </entry>
+ <entry key="Property2" value="1"/>
+ <entry key="Partitions">
+ <value type="java.lang.Integer">5</value>
+ </entry>
+ </map>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="dyn-cache2-*"/>
+
+ <property name="affinity">
+ <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction">
+ <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFairFunc, Apache.Ignite.Core.Tests"/>
+ <property name="properties">
+ <map>
+ <entry key="Property1">
+ <value type="java.lang.Integer">1</value>
+ </entry>
+ <entry key="Property2" value="1"/>
+ <entry key="Partitions">
+ <value type="java.lang.Integer">5</value>
+ </entry>
+ </map>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml
new file mode 100644
index 0000000..cab34b5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml
@@ -0,0 +1,49 @@
+\ufeff<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="localHost" value="127.0.0.1"/>
+ <property name="connectorConfiguration"><null/></property>
+ <property name="gridName" value="grid2" />
+
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ <property name="socketTimeout" value="300" />
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
index 2b0ab8e..95be6dc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests
using System;
using System.Diagnostics;
using System.Reflection;
+ using Apache.Ignite.Core.Tests.Cache.Affinity;
using Apache.Ignite.Core.Tests.Memory;
using NUnit.ConsoleRunner;
@@ -33,7 +34,7 @@ namespace Apache.Ignite.Core.Tests
//TestOne(typeof(ContinuousQueryAtomiclBackupTest), "TestInitialQuery");
- TestAll(typeof (ExecutableTest));
+ TestAll(typeof (AffinityFunctionSpringTest));
//TestAllInAssembly();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 05a7fa7..6793873 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -55,6 +55,12 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Binary\Package-Info.cs" />
+ <Compile Include="Cache\Affinity\AffinityFunctionBase.cs" />
+ <Compile Include="Cache\Affinity\AffinityFunctionContext.cs" />
+ <Compile Include="Cache\Affinity\AffinityTopologyVersion.cs" />
+ <Compile Include="Cache\Affinity\Fair\FairAffinityFunction.cs" />
+ <Compile Include="Cache\Affinity\IAffinityFunction.cs" />
+ <Compile Include="Cache\Affinity\Rendezvous\RendezvousAffinityFunction.cs" />
<Compile Include="Cache\CacheAtomicUpdateTimeoutException.cs" />
<Compile Include="Cache\CacheEntryProcessorException.cs" />
<Compile Include="Cache\CacheException.cs" />
@@ -108,7 +114,10 @@
<Compile Include="Common\IgniteFutureCancelledException.cs" />
<Compile Include="Common\IgniteGuid.cs" />
<Compile Include="Common\Package-Info.cs" />
+ <Compile Include="Impl\Cache\Affinity\AffinityFunctionSerializer.cs" />
+ <Compile Include="Impl\Cache\Affinity\PlatformAffinityFunction.cs" />
<Compile Include="Impl\Cache\Event\JavaCacheEntryEventFilter.cs" />
+ <Compile Include="Impl\Common\ObjectInfoHolder.cs" />
<Compile Include="Impl\Common\PlatformJavaObjectFactoryProxy.cs" />
<Compile Include="Compute\ComputeExecutionRejectedException.cs" />
<Compile Include="Compute\ComputeJobAdapter.cs" />
@@ -414,4 +423,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
new file mode 100644
index 0000000..ce2e5e1
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
@@ -0,0 +1,139 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using System.ComponentModel;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+
+ /// <summary>
+ /// Base class for predefined affinity functions.
+ /// </summary>
+ [Serializable]
+ public abstract class AffinityFunctionBase : IAffinityFunction
+ {
+ /// <summary> The default value for <see cref="Partitions"/> property. </summary>
+ public const int DefaultPartitions = 1024;
+
+ /** */
+ private int _partitions = DefaultPartitions;
+
+ /** */
+ private IAffinityFunction _baseFunction;
+
+
+ /// <summary>
+ /// Gets or sets the total number of partitions.
+ /// </summary>
+ [DefaultValue(DefaultPartitions)]
+ public virtual int Partitions
+ {
+ get { return _partitions; }
+ set { _partitions = value; }
+ }
+
+ /// <summary>
+ /// Gets partition number for a given key starting from 0. Partitioned caches
+ /// should make sure that keys are about evenly distributed across all partitions
+ /// from 0 to <see cref="Partitions" /> for best performance.
+ /// <para />
+ /// Note that for fully replicated caches it is possible to segment key sets among different
+ /// grid node groups. In that case each node group should return a unique partition
+ /// number. However, unlike partitioned cache, mappings of keys to nodes in
+ /// replicated caches are constant and a node cannot migrate from one partition
+ /// to another.
+ /// </summary>
+ /// <param name="key">Key to get partition for.</param>
+ /// <returns>
+ /// Partition number for a given key.
+ /// </returns>
+ public virtual int GetPartition(object key)
+ {
+ ThrowIfUninitialized();
+
+ return _baseFunction.GetPartition(key);
+ }
+
+ /// <summary>
+ /// Removes node from affinity. This method is called when it is safe to remove
+ /// disconnected node from affinity mapping.
+ /// </summary>
+ /// <param name="nodeId">The node identifier.</param>
+ public virtual void RemoveNode(Guid nodeId)
+ {
+ ThrowIfUninitialized();
+
+ _baseFunction.RemoveNode(nodeId);
+ }
+
+ /// <summary>
+ /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+ /// nodes are updated in the same manner. In case of partitioned cache, the returned
+ /// list should contain only the primary and back up nodes with primary node being
+ /// always first.
+ /// <pare />
+ /// Note that partitioned affinity must obey the following contract: given that node
+ /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+ /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+ /// </summary>
+ /// <param name="context">The affinity function context.</param>
+ /// <returns>
+ /// A collection of partitions, where each partition is a collection of nodes,
+ /// where first node is a primary node, and other nodes are backup nodes.
+ /// </returns>
+ public virtual IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ ThrowIfUninitialized();
+
+ return _baseFunction.AssignPartitions(context);
+ }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other.
+ /// </summary>
+ public virtual bool ExcludeNeighbors { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityFunctionBase"/> class.
+ /// </summary>
+ internal AffinityFunctionBase()
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Sets the base function.
+ /// </summary>
+ /// <param name="baseFunc">The base function.</param>
+ internal void SetBaseFunction(IAffinityFunction baseFunc)
+ {
+ _baseFunction = baseFunc;
+ }
+
+ /// <summary>
+ /// Gets the direct usage error.
+ /// </summary>
+ private void ThrowIfUninitialized()
+ {
+ if (_baseFunction == null)
+ throw new IgniteException(GetType() + " has not yet been initialized.");
+ }
+ }
+}