You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 14:16:32 UTC

[14/35] ignite git commit: IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.

IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.

Signed-off-by: Valentin Kulichenko <va...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc8d73ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc8d73ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc8d73ee

Branch: refs/heads/ignite-2407
Commit: bc8d73ee51ba7e7e900f065ddfe7d5f77d23ab83
Parents: ede7200
Author: Oddo Da <od...@gmail.com>
Authored: Tue Feb 23 17:02:41 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Feb 23 17:02:41 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteQueue.java     | 22 +++++++-
 .../main/java/org/apache/ignite/IgniteSet.java  | 22 ++++++++
 .../datastructures/GridCacheQueueAdapter.java   | 26 +++++++++
 .../datastructures/GridCacheQueueProxy.java     | 56 ++++++++++++++++++++
 .../datastructures/GridCacheSetImpl.java        | 34 ++++++++++--
 .../datastructures/GridCacheSetProxy.java       | 48 +++++++++++++++++
 .../GridCacheQueueApiSelfAbstractTest.java      | 41 ++++++++++++++
 7 files changed, 245 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index 370916b..3b892c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@ -22,6 +22,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
 
 /**
  * This interface provides a rich API for working with distributed queues based on In-Memory Data Grid.
@@ -34,7 +37,6 @@ import java.util.concurrent.TimeUnit;
  * {@link Collection} methods in the queue may throw {@link IgniteException} in case
  * of failure.
  * <p>
- * All queue operations have synchronous and asynchronous counterparts.
  * <h1 class="header">Bounded vs Unbounded</h1>
  * Queues can be {@code unbounded} or {@code bounded}. {@code Bounded} queues can
  * have maximum capacity. Queue capacity can be set at creation time and cannot be
@@ -181,4 +183,22 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
      * @return {@code true} if queue was removed from cache {@code false} otherwise.
      */
     public boolean removed();
+
+    /**
+     * Executes given job on collocated queue on the node where the queue is located
+     * (a.k.a. affinity co-location).
+     *
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+    /**
+     * Executes given job on collocated queue on the node where the queue is located
+     * (a.k.a. affinity co-location).
+     *
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index 649a370..ea32186 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@ -22,6 +22,10 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+
 /**
  * Set implementation based on on In-Memory Data Grid.
  * <h1 class="header">Overview</h1>
@@ -103,4 +107,22 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
      * @return {@code True} if set was removed from cache {@code false} otherwise.
      */
     public boolean removed();
+
+    /**
+     * Executes given job on collocated set on the node where the set is located
+     * (a.k.a. affinity co-location).
+     *
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+    /**
+     * Executes given job on collocated set on the node where the set is located
+     * (a.k.a. affinity co-location).
+     *
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index ca0250d..4be7413 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -44,9 +44,13 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.IgniteCompute;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -95,6 +99,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @GridToStringExclude
     private final Semaphore writeSem;
 
+    /** access to affinityRun() and affinityCall() functions */
+    private final IgniteCompute compute;
+
     /** */
     private final boolean binaryMarsh;
 
@@ -113,6 +120,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         queueKey = new GridCacheQueueHeaderKey(queueName);
         cache = cctx.kernalContext().cache().internalCache(cctx.name());
         binaryMarsh = cctx.binaryMarshaller();
+        this.compute = cctx.kernalContext().grid().compute();
 
         log = cctx.logger(getClass());
 
@@ -1063,4 +1071,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @Override public String toString() {
         return S.toString(GridCacheQueueAdapter.class, this);
     }
+
+    /** {@inheritDoc} */
+    public void affinityRun(IgniteRunnable job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+                                      ". This operation is supported only for collocated queues.");
+
+        compute.affinityRun(cache.name(),queueKey,job);
+    }
+
+    /** {@inheritDoc} */
+    public <R> R affinityCall(IgniteCallable<R> job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+                                      ". This operation is supported only for collocated queues.");
+
+        return compute.affinityCall(cache.name(),queueKey,job);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index f1fbe5b..93abbe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -36,6 +38,8 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -777,4 +781,56 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
     @Override public String toString() {
         return delegate.toString();
     }
+
+    /** {@inheritDoc} */
+    @Override public void affinityRun(final IgniteRunnable job) {
+        gate.enter();
+
+        try {
+            if (cctx.transactional()) {
+                CU.outTx(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        if (!delegate.collocated())
+                            throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+                                    ". This operation is supported only for collocated queues.");
+                        delegate.affinityRun(job);
+                        return null;
+                    }
+                }, cctx);
+            } else
+                delegate.affinityRun(job);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            gate.leave();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+        gate.enter();
+
+        try {
+            if (cctx.transactional())
+                return CU.outTx(new Callable<R>() {
+                    @Override public R call() throws Exception {
+                        if (!delegate.collocated())
+                            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+                                    ". This operation is supported only for collocated queues.");
+                        return delegate.affinityCall(job);
+                    }
+                }, cctx);
+
+            return delegate.affinityCall(job);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            gate.leave();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f25e361..3f03380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -30,9 +30,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSet;
+
+import org.apache.ignite.*;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +48,9 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -83,12 +84,18 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /** Queue header partition. */
     private final int hdrPart;
 
+    /** Queue header key. */
+    protected final GridCacheSetHeaderKey setKey;
+
     /** Removed flag. */
     private volatile boolean rmvd;
 
     /** */
     private final boolean binaryMarsh;
 
+    /** access to affinityRun() and affinityCall() functions */
+    private final IgniteCompute compute;
+
     /**
      * @param ctx Cache context.
      * @param name Set name.
@@ -101,9 +108,12 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
         id = hdr.id();
         collocated = hdr.collocated();
         binaryMarsh = ctx.binaryMarshaller();
+        compute = ctx.kernalContext().grid().compute();
 
         cache = ctx.cache();
 
+        setKey = new GridCacheSetHeaderKey(name);
+
         log = ctx.logger(GridCacheSetImpl.class);
 
         hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
@@ -537,6 +547,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
         return S.toString(GridCacheSetImpl.class, this);
     }
 
+    /** {@inheritDoc} */
+    public void affinityRun(IgniteRunnable job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
+                                      ". This operation is supported only for collocated sets.");
+
+        compute.affinityRun(cache.name(),setKey,job);
+    }
+
+    /** {@inheritDoc} */
+    public <R> R affinityCall(IgniteCallable<R> job) {
+        if (!collocated)
+            throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+                                      ". This operation is supported only for collocated queues.");
+
+        return compute.affinityCall(cache.name(),setKey,job);
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 9c47e1c..d40c286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -588,6 +590,52 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
         t.set2(U.readString(in));
     }
 
+    /** {@inheritDoc} */
+    @Override public void affinityRun(final IgniteRunnable job) {
+        gate.enter();
+
+        try {
+            if (cctx.transactional())
+                CU.outTx(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        delegate.affinityRun(job);
+                        return null;
+                    }
+                }, cctx);
+            else
+              delegate.affinityRun(job);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            gate.leave();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+        gate.enter();
+
+        try {
+            if (cctx.transactional())
+                return CU.outTx(new Callable<R>() {
+                    @Override public R call() throws Exception {
+                        return delegate.affinityCall(job);
+                    }
+                }, cctx);
+
+            return delegate.affinityCall(job);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            gate.leave();
+        }
+    }
+
     /**
      * Reconstructs object on unmarshalling.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index 5dea3f5..0c76595 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -27,11 +27,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -604,6 +608,42 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
         assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
     }
 
+    public void testAffinityRun() throws Exception {
+        /** Test exception on non-collocated queue */
+        final CollectionConfiguration colCfg = collectionConfiguration();
+        colCfg.setCollocated(false);
+        colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                queue.affinityRun(new IgniteRunnable() {
+                    @Override public void run() { ; }});
+                return null;
+            }
+        }, IgniteException.class,
+           "Failed to execute affinityRun() for non-collocated queue: " + queue.name() +
+           ". This operation is supported only for collocated queues.");
+
+        queue.close();
+
+        /** Test running a job on a collocated queue */
+        colCfg.setCollocated(true);
+        final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
+
+        /** add a number to the queue */
+        queue2.add(100);
+
+        /** read the number back using affinityRun() */
+        queue2.affinityRun(new IgniteRunnable() {
+            @Override public void run() {
+                assert((int)queue2.take() == 100);
+            }
+        });
+        queue2.close();
+    }
+
     /**
      *  Test class with the same hash code.
      */
@@ -642,4 +682,5 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
             return S.toString(SameHashItem.class, this);
         }
     }
+
 }
\ No newline at end of file