You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/02/24 02:03:45 UTC
[1/2] ignite git commit: IGNITE-1144 Add affinity functions to
collocated Ignite Queue and Set. Fixes #459.
Repository: ignite
Updated Branches:
refs/heads/master ede72006d -> eb5bce295
IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc8d73ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc8d73ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc8d73ee
Branch: refs/heads/master
Commit: bc8d73ee51ba7e7e900f065ddfe7d5f77d23ab83
Parents: ede7200
Author: Oddo Da <od...@gmail.com>
Authored: Tue Feb 23 17:02:41 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Feb 23 17:02:41 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteQueue.java | 22 +++++++-
.../main/java/org/apache/ignite/IgniteSet.java | 22 ++++++++
.../datastructures/GridCacheQueueAdapter.java | 26 +++++++++
.../datastructures/GridCacheQueueProxy.java | 56 ++++++++++++++++++++
.../datastructures/GridCacheSetImpl.java | 34 ++++++++++--
.../datastructures/GridCacheSetProxy.java | 48 +++++++++++++++++
.../GridCacheQueueApiSelfAbstractTest.java | 41 ++++++++++++++
7 files changed, 245 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index 370916b..3b892c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@ -22,6 +22,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
/**
* This interface provides a rich API for working with distributed queues based on In-Memory Data Grid.
@@ -34,7 +37,6 @@ import java.util.concurrent.TimeUnit;
* {@link Collection} methods in the queue may throw {@link IgniteException} in case
* of failure.
* <p>
- * All queue operations have synchronous and asynchronous counterparts.
* <h1 class="header">Bounded vs Unbounded</h1>
* Queues can be {@code unbounded} or {@code bounded}. {@code Bounded} queues can
* have maximum capacity. Queue capacity can be set at creation time and cannot be
@@ -181,4 +183,22 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
* @return {@code true} if queue was removed from cache {@code false} otherwise.
*/
public boolean removed();
+
+ /**
+ * Executes given job on collocated queue on the node where the queue is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+ /**
+ * Executes given job on collocated queue on the node where the queue is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index 649a370..ea32186 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@ -22,6 +22,10 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+
/**
* Set implementation based on on In-Memory Data Grid.
* <h1 class="header">Overview</h1>
@@ -103,4 +107,22 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
* @return {@code True} if set was removed from cache {@code false} otherwise.
*/
public boolean removed();
+
+ /**
+ * Executes given job on collocated set on the node where the set is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+ /**
+ * Executes given job on collocated set on the node where the set is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index ca0250d..4be7413 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -44,9 +44,13 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.IgniteCompute;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -95,6 +99,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@GridToStringExclude
private final Semaphore writeSem;
+ /** access to affinityRun() and affinityCall() functions */
+ private final IgniteCompute compute;
+
/** */
private final boolean binaryMarsh;
@@ -113,6 +120,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
queueKey = new GridCacheQueueHeaderKey(queueName);
cache = cctx.kernalContext().cache().internalCache(cctx.name());
binaryMarsh = cctx.binaryMarshaller();
+ this.compute = cctx.kernalContext().grid().compute();
log = cctx.logger(getClass());
@@ -1063,4 +1071,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@Override public String toString() {
return S.toString(GridCacheQueueAdapter.class, this);
}
+
+ /** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ compute.affinityRun(cache.name(),queueKey,job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ return compute.affinityCall(cache.name(),queueKey,job);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index f1fbe5b..93abbe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -36,6 +38,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
/**
@@ -777,4 +781,56 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
@Override public String toString() {
return delegate.toString();
}
+
+ /** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional()) {
+ CU.outTx(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!delegate.collocated())
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+ delegate.affinityRun(job);
+ return null;
+ }
+ }, cctx);
+ } else
+ delegate.affinityRun(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ return CU.outTx(new Callable<R>() {
+ @Override public R call() throws Exception {
+ if (!delegate.collocated())
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+ return delegate.affinityCall(job);
+ }
+ }, cctx);
+
+ return delegate.affinityCall(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f25e361..3f03380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -30,9 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSet;
+
+import org.apache.ignite.*;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +48,9 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -83,12 +84,18 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/** Queue header partition. */
private final int hdrPart;
+ /** Queue header key. */
+ protected final GridCacheSetHeaderKey setKey;
+
/** Removed flag. */
private volatile boolean rmvd;
/** */
private final boolean binaryMarsh;
+ /** access to affinityRun() and affinityCall() functions */
+ private final IgniteCompute compute;
+
/**
* @param ctx Cache context.
* @param name Set name.
@@ -101,9 +108,12 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
id = hdr.id();
collocated = hdr.collocated();
binaryMarsh = ctx.binaryMarshaller();
+ compute = ctx.kernalContext().grid().compute();
cache = ctx.cache();
+ setKey = new GridCacheSetHeaderKey(name);
+
log = ctx.logger(GridCacheSetImpl.class);
hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
@@ -537,6 +547,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
return S.toString(GridCacheSetImpl.class, this);
}
+ /** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
+ ". This operation is supported only for collocated sets.");
+
+ compute.affinityRun(cache.name(),setKey,job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ return compute.affinityCall(cache.name(),setKey,job);
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 9c47e1c..d40c286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.NotNull;
/**
@@ -588,6 +590,52 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
t.set2(U.readString(in));
}
+ /** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ CU.outTx(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ delegate.affinityRun(job);
+ return null;
+ }
+ }, cctx);
+ else
+ delegate.affinityRun(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ return CU.outTx(new Callable<R>() {
+ @Override public R call() throws Exception {
+ return delegate.affinityCall(job);
+ }
+ }, cctx);
+
+ return delegate.affinityCall(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
/**
* Reconstructs object on unmarshalling.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index 5dea3f5..0c76595 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -27,11 +27,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -604,6 +608,42 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
}
+ public void testAffinityRun() throws Exception {
+ /** Test exception on non-collocated queue */
+ final CollectionConfiguration colCfg = collectionConfiguration();
+ colCfg.setCollocated(false);
+ colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ queue.affinityRun(new IgniteRunnable() {
+ @Override public void run() { ; }});
+ return null;
+ }
+ }, IgniteException.class,
+ "Failed to execute affinityRun() for non-collocated queue: " + queue.name() +
+ ". This operation is supported only for collocated queues.");
+
+ queue.close();
+
+ /** Test running a job on a collocated queue */
+ colCfg.setCollocated(true);
+ final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
+
+ /** add a number to the queue */
+ queue2.add(100);
+
+ /** read the number back using affinityRun() */
+ queue2.affinityRun(new IgniteRunnable() {
+ @Override public void run() {
+ assert((int)queue2.take() == 100);
+ }
+ });
+ queue2.close();
+ }
+
/**
* Test class with the same hash code.
*/
@@ -642,4 +682,5 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
return S.toString(SameHashItem.class, this);
}
}
+
}
\ No newline at end of file
[2/2] ignite git commit: IGNITE-1144 Add affinity functions to
collocated Ignite Queue and Set. Fixes #459.
Posted by vk...@apache.org.
IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb5bce29
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb5bce29
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb5bce29
Branch: refs/heads/master
Commit: eb5bce2952ec05d4bd49a99fa56d1e979d4436ce
Parents: bc8d73e
Author: Oddo Da <od...@gmail.com>
Authored: Tue Feb 23 17:03:16 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Feb 23 17:03:16 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteQueue.java | 13 ++-
.../main/java/org/apache/ignite/IgniteSet.java | 12 +-
.../datastructures/GridCacheQueueAdapter.java | 45 ++++----
.../datastructures/GridCacheQueueProxy.java | 66 ++---------
.../datastructures/GridCacheSetImpl.java | 44 ++++----
.../datastructures/GridCacheSetProxy.java | 58 ++--------
.../GridCacheQueueApiSelfAbstractTest.java | 111 ++++++++++++++-----
.../GridCacheSetAbstractSelfTest.java | 103 ++++++++++++++++-
8 files changed, 269 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index 3b892c2..2dc38e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@ -22,9 +22,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.lang.IgniteAsyncSupported;
-import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
/**
* This interface provides a rich API for working with distributed queues based on In-Memory Data Grid.
@@ -187,8 +186,10 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
/**
* Executes given job on collocated queue on the node where the queue is located
* (a.k.a. affinity co-location).
+ * <p>
+ * This is not supported for non-collocated queues.
*
- * @param job Job which will be co-located on the node with given affinity key.
+ * @param job Job which will be co-located with the queue.
* @throws IgniteException If job failed.
*/
public void affinityRun(IgniteRunnable job) throws IgniteException;
@@ -196,9 +197,11 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
/**
* Executes given job on collocated queue on the node where the queue is located
* (a.k.a. affinity co-location).
+ * <p>
+ * This is not supported for non-collocated queues.
*
- * @param job Job which will be co-located on the node with given affinity key.
+ * @param job Job which will be co-located with the queue.
* @throws IgniteException If job failed.
*/
public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index ea32186..d72d9fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@ -21,8 +21,6 @@ import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
-
-import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteRunnable;
@@ -111,8 +109,10 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
/**
* Executes given job on collocated set on the node where the set is located
* (a.k.a. affinity co-location).
+ * <p>
+ * This is not supported for non-collocated sets.
*
- * @param job Job which will be co-located on the node with given affinity key.
+ * @param job Job which will be co-located with the set.
* @throws IgniteException If job failed.
*/
public void affinityRun(IgniteRunnable job) throws IgniteException;
@@ -120,9 +120,11 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
/**
* Executes given job on collocated set on the node where the set is located
* (a.k.a. affinity co-location).
+ * <p>
+ * This is not supported for non-collocated sets.
*
- * @param job Job which will be co-located on the node with given affinity key.
+ * @param job Job which will be co-located with the set.
* @throws IgniteException If job failed.
*/
public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 4be7413..caf3ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
@@ -44,13 +45,11 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.IgniteCompute;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -99,7 +98,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@GridToStringExclude
private final Semaphore writeSem;
- /** access to affinityRun() and affinityCall() functions */
+ /** Access to affinityRun() and affinityCall() functions. */
private final IgniteCompute compute;
/** */
@@ -411,6 +410,24 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
return rmvd;
}
+ /** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ compute.affinityRun(cache.name(), queueKey, job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ return compute.affinityCall(cache.name(), queueKey, job);
+ }
+
/**
* @param cache Queue cache.
* @param id Queue unique ID.
@@ -1071,22 +1088,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@Override public String toString() {
return S.toString(GridCacheQueueAdapter.class, this);
}
-
- /** {@inheritDoc} */
- public void affinityRun(IgniteRunnable job) {
- if (!collocated)
- throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
- ". This operation is supported only for collocated queues.");
-
- compute.affinityRun(cache.name(),queueKey,job);
- }
-
- /** {@inheritDoc} */
- public <R> R affinityCall(IgniteCallable<R> job) {
- if (!collocated)
- throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
- ". This operation is supported only for collocated queues.");
-
- return compute.affinityCall(cache.name(),queueKey,job);
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index 93abbe6..c869743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -28,8 +28,6 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -726,6 +724,16 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
}
/** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ delegate.affinityRun(job);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ return delegate.affinityCall(job);
+ }
+
+ /** {@inheritDoc} */
@Override public int hashCode() {
return delegate.hashCode();
}
@@ -781,56 +789,4 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
@Override public String toString() {
return delegate.toString();
}
-
- /** {@inheritDoc} */
- @Override public void affinityRun(final IgniteRunnable job) {
- gate.enter();
-
- try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- if (!delegate.collocated())
- throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
- ". This operation is supported only for collocated queues.");
- delegate.affinityRun(job);
- return null;
- }
- }, cctx);
- } else
- delegate.affinityRun(job);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- gate.leave();
- }
- }
-
- /** {@inheritDoc} */
- @Override public <R> R affinityCall(final IgniteCallable<R> job) {
- gate.enter();
-
- try {
- if (cctx.transactional())
- return CU.outTx(new Callable<R>() {
- @Override public R call() throws Exception {
- if (!delegate.collocated())
- throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
- ". This operation is supported only for collocated queues.");
- return delegate.affinityCall(job);
- }
- }, cctx);
-
- return delegate.affinityCall(job);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- gate.leave();
- }
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 3f03380..5b74992 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -81,10 +81,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/** Collocation flag. */
private final boolean collocated;
- /** Queue header partition. */
+ /** Set header partition. */
private final int hdrPart;
- /** Queue header key. */
+ /** Set header key. */
protected final GridCacheSetHeaderKey setKey;
/** Removed flag. */
@@ -93,7 +93,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/** */
private final boolean binaryMarsh;
- /** access to affinityRun() and affinityCall() functions */
+ /** Access to affinityRun() and affinityCall() functions. */
private final IgniteCompute compute;
/**
@@ -374,6 +374,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
/** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated set: " + name() +
+ ". This operation is supported only for collocated sets.");
+
+ compute.affinityRun(cache.name(), setKey, job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
+ ". This operation is supported only for collocated sets.");
+
+ return compute.affinityCall(cache.name(), setKey, job);
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
try {
if (rmvd)
@@ -547,24 +565,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
return S.toString(GridCacheSetImpl.class, this);
}
- /** {@inheritDoc} */
- public void affinityRun(IgniteRunnable job) {
- if (!collocated)
- throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
- ". This operation is supported only for collocated sets.");
-
- compute.affinityRun(cache.name(),setKey,job);
- }
-
- /** {@inheritDoc} */
- public <R> R affinityCall(IgniteCallable<R> job) {
- if (!collocated)
- throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
- ". This operation is supported only for collocated queues.");
-
- return compute.affinityCall(cache.name(),setKey,job);
- }
-
/**
*
*/
@@ -655,4 +655,4 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
setName = U.readString(in);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index d40c286..219bb4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -526,6 +526,16 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
return delegate.removed();
}
+ /** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ delegate.affinityRun(job);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ return delegate.affinityCall(job);
+ }
+
/**
* Enters busy state.
*/
@@ -590,52 +600,6 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
t.set2(U.readString(in));
}
- /** {@inheritDoc} */
- @Override public void affinityRun(final IgniteRunnable job) {
- gate.enter();
-
- try {
- if (cctx.transactional())
- CU.outTx(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- delegate.affinityRun(job);
- return null;
- }
- }, cctx);
- else
- delegate.affinityRun(job);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- gate.leave();
- }
- }
-
- /** {@inheritDoc} */
- @Override public <R> R affinityCall(final IgniteCallable<R> job) {
- gate.enter();
-
- try {
- if (cctx.transactional())
- return CU.outTx(new Callable<R>() {
- @Override public R call() throws Exception {
- return delegate.affinityCall(job);
- }
- }, cctx);
-
- return delegate.affinityCall(job);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- gate.leave();
- }
- }
-
/**
* Reconstructs object on unmarshalling.
*
@@ -660,4 +624,4 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
@Override public String toString() {
return delegate.toString();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index 0c76595..f9499a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -28,14 +28,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -608,40 +609,100 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
}
+ /**
+ * @throws Exception If failed.
+ */
public void testAffinityRun() throws Exception {
- /** Test exception on non-collocated queue */
final CollectionConfiguration colCfg = collectionConfiguration();
+
colCfg.setCollocated(false);
colCfg.setCacheMode(CacheMode.PARTITIONED);
- final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
+ try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ queue1.affinityRun(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ },
+ IgniteException.class,
+ "Failed to execute affinityRun() for non-collocated queue: " + queue1.name() +
+ ". This operation is supported only for collocated queues.");
+ }
- GridTestUtils.assertThrows(log, new Callable<Void>() {
- @Override public Void call() throws Exception {
- queue.affinityRun(new IgniteRunnable() {
- @Override public void run() { ; }});
- return null;
- }
- }, IgniteException.class,
- "Failed to execute affinityRun() for non-collocated queue: " + queue.name() +
- ". This operation is supported only for collocated queues.");
+ colCfg.setCollocated(true);
+
+ try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
+ queue2.add(100);
+
+ queue2.affinityRun(new IgniteRunnable() {
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ @Override public void run() {
+ assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+ ignite.cluster().localNode(), "Queue2"));
+
+ assertEquals(100, queue2.take().intValue());
+ }
+ });
+ }
+ }
- queue.close();
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCall() throws Exception {
+ final CollectionConfiguration colCfg = collectionConfiguration();
+
+ colCfg.setCollocated(false);
+ colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ queue1.affinityCall(new IgniteCallable<Object>() {
+ @Override public Object call() {
+ return null;
+ }
+ });
+
+ return null;
+ }
+ },
+ IgniteException.class,
+ "Failed to execute affinityCall() for non-collocated queue: " + queue1.name() +
+ ". This operation is supported only for collocated queues.");
+ }
- /** Test running a job on a collocated queue */
colCfg.setCollocated(true);
- final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
- /** add a number to the queue */
- queue2.add(100);
+ try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
+ queue2.add(100);
- /** read the number back using affinityRun() */
- queue2.affinityRun(new IgniteRunnable() {
- @Override public void run() {
- assert((int)queue2.take() == 100);
- }
- });
- queue2.close();
+ Integer res = queue2.affinityCall(new IgniteCallable<Integer>() {
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ @Override public Integer call() {
+ assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+ ignite.cluster().localNode(), "Queue2"));
+
+ return queue2.take();
+ }
+ });
+
+ assertEquals(100, res.intValue());
+ }
}
/**
@@ -683,4 +744,4 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
index 7c0cc30..c63df40 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
@@ -30,8 +30,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -40,6 +43,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -846,6 +851,102 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAffinityRun() throws Exception {
+ final CollectionConfiguration colCfg = collectionConfiguration();
+
+ colCfg.setCollocated(false);
+ colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ set1.affinityRun(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ return null;
+ }
+ },
+ IgniteException.class,
+ "Failed to execute affinityRun() for non-collocated set: " + set1.name() +
+ ". This operation is supported only for collocated sets.");
+ }
+
+ colCfg.setCollocated(true);
+
+ try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
+ set2.add(100);
+
+ set2.affinityRun(new IgniteRunnable() {
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ @Override public void run() {
+ assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+ ignite.cluster().localNode(), "Set2"));
+
+ assertEquals(100, set2.iterator().next().intValue());
+ }
+ });
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCall() throws Exception {
+ final CollectionConfiguration colCfg = collectionConfiguration();
+
+ colCfg.setCollocated(false);
+ colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
+ GridTestUtils.assertThrows(
+ log,
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ set1.affinityCall(new IgniteCallable<Object>() {
+ @Override public Object call() {
+ return null;
+ }
+ });
+
+ return null;
+ }
+ },
+ IgniteException.class,
+ "Failed to execute affinityCall() for non-collocated set: " + set1.name() +
+ ". This operation is supported only for collocated sets.");
+ }
+
+ colCfg.setCollocated(true);
+
+ try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
+ set2.add(100);
+
+ Integer res = set2.affinityCall(new IgniteCallable<Integer>() {
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ @Override public Integer call() {
+ assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+ ignite.cluster().localNode(), "Set2"));
+
+ return set2.iterator().next();
+ }
+ });
+
+ assertEquals(100, res.intValue());
+ }
+ }
+
+ /**
* @param set Set.
* @param size Expected size.
*/
@@ -882,4 +983,4 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr
assertTrue(found);
}
}
-}
\ No newline at end of file
+}