You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 14:16:33 UTC

[15/35] ignite git commit: IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.

IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.

Signed-off-by: Valentin Kulichenko <va...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb5bce29
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb5bce29
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb5bce29

Branch: refs/heads/ignite-2407
Commit: eb5bce2952ec05d4bd49a99fa56d1e979d4436ce
Parents: bc8d73e
Author: Oddo Da <od...@gmail.com>
Authored: Tue Feb 23 17:03:16 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Feb 23 17:03:16 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteQueue.java     |  13 ++-
 .../main/java/org/apache/ignite/IgniteSet.java  |  12 +-
 .../datastructures/GridCacheQueueAdapter.java   |  45 ++++----
 .../datastructures/GridCacheQueueProxy.java     |  66 ++---------
 .../datastructures/GridCacheSetImpl.java        |  44 ++++----
 .../datastructures/GridCacheSetProxy.java       |  58 ++--------
 .../GridCacheQueueApiSelfAbstractTest.java      | 111 ++++++++++++++-----
 .../GridCacheSetAbstractSelfTest.java           | 103 ++++++++++++++++-
 8 files changed, 269 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index 3b892c2..2dc38e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@ -22,9 +22,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.lang.IgniteAsyncSupported;
-import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
 
 /**
  * This interface provides a rich API for working with distributed queues based on In-Memory Data Grid.
@@ -187,8 +186,10 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
     /**
      * Executes given job on collocated queue on the node where the queue is located
      * (a.k.a. affinity co-location).
+     * <p>
+     * This is not supported for non-collocated queues.
      *
-     * @param job Job which will be co-located on the node with given affinity key.
+     * @param job Job which will be co-located with the queue.
      * @throws IgniteException If job failed.
      */
     public void affinityRun(IgniteRunnable job) throws IgniteException;
@@ -196,9 +197,11 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
     /**
      * Executes given job on collocated queue on the node where the queue is located
      * (a.k.a. affinity co-location).
+     * <p>
+     * This is not supported for non-collocated queues.
      *
-     * @param job Job which will be co-located on the node with given affinity key.
+     * @param job Job which will be co-located with the queue.
      * @throws IgniteException If job failed.
      */
     public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index ea32186..d72d9fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@ -21,8 +21,6 @@ import java.io.Closeable;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
-
-import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteRunnable;
 
@@ -111,8 +109,10 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
     /**
      * Executes given job on collocated set on the node where the set is located
      * (a.k.a. affinity co-location).
+     * <p>
+     * This is not supported for non-collocated sets.
      *
-     * @param job Job which will be co-located on the node with given affinity key.
+     * @param job Job which will be co-located with the set.
      * @throws IgniteException If job failed.
      */
     public void affinityRun(IgniteRunnable job) throws IgniteException;
@@ -120,9 +120,11 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
     /**
      * Executes given job on collocated set on the node where the set is located
      * (a.k.a. affinity co-location).
+     * <p>
+     * This is not supported for non-collocated sets.
      *
-     * @param job Job which will be co-located on the node with given affinity key.
+     * @param job Job which will be co-located with the set.
      * @throws IgniteException If job failed.
      */
     public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 4be7413..caf3ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
@@ -44,13 +45,11 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.lang.IgniteCallable;
-import org.apache.ignite.IgniteCompute;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -99,7 +98,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @GridToStringExclude
     private final Semaphore writeSem;
 
-    /** access to affinityRun() and affinityCall() functions */
+    /** Access to affinityRun() and affinityCall() functions. */
     private final IgniteCompute compute;
 
     /** */
@@ -411,6 +410,24 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         return rmvd;
     }
 
+    /** {@inheritDoc} */
+    public void affinityRun(IgniteRunnable job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+                ". This operation is supported only for collocated queues.");
+
+        compute.affinityRun(cache.name(), queueKey, job);
+    }
+
+    /** {@inheritDoc} */
+    public <R> R affinityCall(IgniteCallable<R> job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+                ". This operation is supported only for collocated queues.");
+
+        return compute.affinityCall(cache.name(), queueKey, job);
+    }
+
     /**
      * @param cache Queue cache.
      * @param id Queue unique ID.
@@ -1071,22 +1088,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @Override public String toString() {
         return S.toString(GridCacheQueueAdapter.class, this);
     }
-
-    /** {@inheritDoc} */
-    public void affinityRun(IgniteRunnable job) {
-        if (!collocated)
-            throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
-                                      ". This operation is supported only for collocated queues.");
-
-        compute.affinityRun(cache.name(),queueKey,job);
-    }
-
-    /** {@inheritDoc} */
-    public <R> R affinityCall(IgniteCallable<R> job) {
-        if (!collocated)
-            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
-                                      ". This operation is supported only for collocated queues.");
-
-        return compute.affinityCall(cache.name(),queueKey,job);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index 93abbe6..c869743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -28,8 +28,6 @@ import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -726,6 +724,16 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public void affinityRun(final IgniteRunnable job) {
+        delegate.affinityRun(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+        return delegate.affinityCall(job);
+    }
+
+    /** {@inheritDoc} */
     @Override public int hashCode() {
         return delegate.hashCode();
     }
@@ -781,56 +789,4 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
     @Override public String toString() {
         return delegate.toString();
     }
-
-    /** {@inheritDoc} */
-    @Override public void affinityRun(final IgniteRunnable job) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional()) {
-                CU.outTx(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (!delegate.collocated())
-                            throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
-                                    ". This operation is supported only for collocated queues.");
-                        delegate.affinityRun(job);
-                        return null;
-                    }
-                }, cctx);
-            } else
-                delegate.affinityRun(job);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<R>() {
-                    @Override public R call() throws Exception {
-                        if (!delegate.collocated())
-                            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
-                                    ". This operation is supported only for collocated queues.");
-                        return delegate.affinityCall(job);
-                    }
-                }, cctx);
-
-            return delegate.affinityCall(job);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 3f03380..5b74992 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -81,10 +81,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /** Collocation flag. */
     private final boolean collocated;
 
-    /** Queue header partition. */
+    /** Set header partition. */
     private final int hdrPart;
 
-    /** Queue header key. */
+    /** Set header key. */
     protected final GridCacheSetHeaderKey setKey;
 
     /** Removed flag. */
@@ -93,7 +93,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /** */
     private final boolean binaryMarsh;
 
-    /** access to affinityRun() and affinityCall() functions */
+    /** Access to affinityRun() and affinityCall() functions. */
     private final IgniteCompute compute;
 
     /**
@@ -374,6 +374,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     }
 
     /** {@inheritDoc} */
+    public void affinityRun(IgniteRunnable job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityRun() for non-collocated set: " + name() +
+                ". This operation is supported only for collocated sets.");
+
+        compute.affinityRun(cache.name(), setKey, job);
+    }
+
+    /** {@inheritDoc} */
+    public <R> R affinityCall(IgniteCallable<R> job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
+                ". This operation is supported only for collocated sets.");
+
+        return compute.affinityCall(cache.name(), setKey, job);
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() {
         try {
             if (rmvd)
@@ -547,24 +565,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
         return S.toString(GridCacheSetImpl.class, this);
     }
 
-    /** {@inheritDoc} */
-    public void affinityRun(IgniteRunnable job) {
-        if (!collocated)
-            throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
-                                      ". This operation is supported only for collocated sets.");
-
-        compute.affinityRun(cache.name(),setKey,job);
-    }
-
-    /** {@inheritDoc} */
-    public <R> R affinityCall(IgniteCallable<R> job) {
-        if (!collocated)
-            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
-                                      ". This operation is supported only for collocated queues.");
-
-        return compute.affinityCall(cache.name(),setKey,job);
-    }
-
     /**
      *
      */
@@ -655,4 +655,4 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             setName = U.readString(in);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index d40c286..219bb4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -526,6 +526,16 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
         return delegate.removed();
     }
 
+    /** {@inheritDoc} */
+    @Override public void affinityRun(final IgniteRunnable job) {
+        delegate.affinityRun(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+        return delegate.affinityCall(job);
+    }
+
     /**
      * Enters busy state.
      */
@@ -590,52 +600,6 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
         t.set2(U.readString(in));
     }
 
-    /** {@inheritDoc} */
-    @Override public void affinityRun(final IgniteRunnable job) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                CU.outTx(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        delegate.affinityRun(job);
-                        return null;
-                    }
-                }, cctx);
-            else
-              delegate.affinityRun(job);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
-        gate.enter();
-
-        try {
-            if (cctx.transactional())
-                return CU.outTx(new Callable<R>() {
-                    @Override public R call() throws Exception {
-                        return delegate.affinityCall(job);
-                    }
-                }, cctx);
-
-            return delegate.affinityCall(job);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            gate.leave();
-        }
-    }
-
     /**
      * Reconstructs object on unmarshalling.
      *
@@ -660,4 +624,4 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
     @Override public String toString() {
         return delegate.toString();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index 0c76595..f9499a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -28,14 +28,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -608,40 +609,100 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
     }
 
+    /**
+     * @throws Exception If failed.
+     */
     public void testAffinityRun() throws Exception {
-        /** Test exception on non-collocated queue */
         final CollectionConfiguration colCfg = collectionConfiguration();
+
         colCfg.setCollocated(false);
         colCfg.setCacheMode(CacheMode.PARTITIONED);
 
-        final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
+        try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        queue1.affinityRun(new IgniteRunnable() {
+                            @Override public void run() {
+                                // No-op.
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                IgniteException.class,
+                "Failed to execute affinityRun() for non-collocated queue: " + queue1.name() +
+                    ". This operation is supported only for collocated queues.");
+        }
 
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                queue.affinityRun(new IgniteRunnable() {
-                    @Override public void run() { ; }});
-                return null;
-            }
-        }, IgniteException.class,
-           "Failed to execute affinityRun() for non-collocated queue: " + queue.name() +
-           ". This operation is supported only for collocated queues.");
+        colCfg.setCollocated(true);
+
+        try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
+            queue2.add(100);
+
+            queue2.affinityRun(new IgniteRunnable() {
+                @IgniteInstanceResource
+                private IgniteEx ignite;
+
+                @Override public void run() {
+                    assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+                        ignite.cluster().localNode(), "Queue2"));
+
+                    assertEquals(100, queue2.take().intValue());
+                }
+            });
+        }
+    }
 
-        queue.close();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityCall() throws Exception {
+        final CollectionConfiguration colCfg = collectionConfiguration();
+
+        colCfg.setCollocated(false);
+        colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        try (final IgniteQueue<Integer> queue1 = grid(0).queue("Queue1", 0, colCfg)) {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        queue1.affinityCall(new IgniteCallable<Object>() {
+                            @Override public Object call() {
+                                return null;
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                IgniteException.class,
+                "Failed to execute affinityCall() for non-collocated queue: " + queue1.name() +
+                    ". This operation is supported only for collocated queues.");
+        }
 
-        /** Test running a job on a collocated queue */
         colCfg.setCollocated(true);
-        final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
 
-        /** add a number to the queue */
-        queue2.add(100);
+        try (final IgniteQueue<Integer> queue2 = grid(0).queue("Queue2", 0, colCfg)) {
+            queue2.add(100);
 
-        /** read the number back using affinityRun() */
-        queue2.affinityRun(new IgniteRunnable() {
-            @Override public void run() {
-                assert((int)queue2.take() == 100);
-            }
-        });
-        queue2.close();
+            Integer res = queue2.affinityCall(new IgniteCallable<Integer>() {
+                @IgniteInstanceResource
+                private IgniteEx ignite;
+
+                @Override public Integer call() {
+                    assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+                        ignite.cluster().localNode(), "Queue2"));
+
+                    return queue2.take();
+                }
+            });
+
+            assertEquals(100, res.intValue());
+        }
     }
 
     /**
@@ -683,4 +744,4 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
index 7c0cc30..c63df40 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
@@ -30,8 +30,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import junit.framework.AssertionFailedError;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -40,6 +43,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -846,6 +851,102 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityRun() throws Exception {
+        final CollectionConfiguration colCfg = collectionConfiguration();
+
+        colCfg.setCollocated(false);
+        colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        set1.affinityRun(new IgniteRunnable() {
+                            @Override public void run() {
+                                // No-op.
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                IgniteException.class,
+                "Failed to execute affinityRun() for non-collocated set: " + set1.name() +
+                    ". This operation is supported only for collocated sets.");
+        }
+
+        colCfg.setCollocated(true);
+
+        try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
+            set2.add(100);
+
+            set2.affinityRun(new IgniteRunnable() {
+                @IgniteInstanceResource
+                private IgniteEx ignite;
+
+                @Override public void run() {
+                    assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+                        ignite.cluster().localNode(), "Set2"));
+
+                    assertEquals(100, set2.iterator().next().intValue());
+                }
+            });
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityCall() throws Exception {
+        final CollectionConfiguration colCfg = collectionConfiguration();
+
+        colCfg.setCollocated(false);
+        colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        set1.affinityCall(new IgniteCallable<Object>() {
+                            @Override public Object call() {
+                                return null;
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                IgniteException.class,
+                "Failed to execute affinityCall() for non-collocated set: " + set1.name() +
+                    ". This operation is supported only for collocated sets.");
+        }
+
+        colCfg.setCollocated(true);
+
+        try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
+            set2.add(100);
+
+            Integer res = set2.affinityCall(new IgniteCallable<Integer>() {
+                @IgniteInstanceResource
+                private IgniteEx ignite;
+
+                @Override public Integer call() {
+                    assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup(
+                        ignite.cluster().localNode(), "Set2"));
+
+                    return set2.iterator().next();
+                }
+            });
+
+            assertEquals(100, res.intValue());
+        }
+    }
+
+    /**
      * @param set Set.
      * @param size Expected size.
      */
@@ -882,4 +983,4 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr
             assertTrue(found);
         }
     }
-}
\ No newline at end of file
+}