You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 14:16:32 UTC
[14/35] ignite git commit: IGNITE-1144 Add affinity functions to
collocated Ignite Queue and Set. Fixes #459.
IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459.
Signed-off-by: Valentin Kulichenko <va...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc8d73ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc8d73ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc8d73ee
Branch: refs/heads/ignite-2407
Commit: bc8d73ee51ba7e7e900f065ddfe7d5f77d23ab83
Parents: ede7200
Author: Oddo Da <od...@gmail.com>
Authored: Tue Feb 23 17:02:41 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Tue Feb 23 17:02:41 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteQueue.java | 22 +++++++-
.../main/java/org/apache/ignite/IgniteSet.java | 22 ++++++++
.../datastructures/GridCacheQueueAdapter.java | 26 +++++++++
.../datastructures/GridCacheQueueProxy.java | 56 ++++++++++++++++++++
.../datastructures/GridCacheSetImpl.java | 34 ++++++++++--
.../datastructures/GridCacheSetProxy.java | 48 +++++++++++++++++
.../GridCacheQueueApiSelfAbstractTest.java | 41 ++++++++++++++
7 files changed, 245 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index 370916b..3b892c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@ -22,6 +22,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
/**
* This interface provides a rich API for working with distributed queues based on In-Memory Data Grid.
@@ -34,7 +37,6 @@ import java.util.concurrent.TimeUnit;
* {@link Collection} methods in the queue may throw {@link IgniteException} in case
* of failure.
* <p>
- * All queue operations have synchronous and asynchronous counterparts.
* <h1 class="header">Bounded vs Unbounded</h1>
* Queues can be {@code unbounded} or {@code bounded}. {@code Bounded} queues can
* have maximum capacity. Queue capacity can be set at creation time and cannot be
@@ -181,4 +183,22 @@ public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
* @return {@code true} if queue was removed from cache {@code false} otherwise.
*/
public boolean removed();
+
+ /**
+ * Executes given job on collocated queue on the node where the queue is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+ /**
+ * Executes given job on collocated queue on the node where the queue is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index 649a370..ea32186 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@ -22,6 +22,10 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
+import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+
/**
* Set implementation based on on In-Memory Data Grid.
* <h1 class="header">Overview</h1>
@@ -103,4 +107,22 @@ public interface IgniteSet<T> extends Set<T>, Closeable {
* @return {@code True} if set was removed from cache {@code false} otherwise.
*/
public boolean removed();
+
+ /**
+ * Executes given job on collocated set on the node where the set is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public void affinityRun(IgniteRunnable job) throws IgniteException;
+
+ /**
+ * Executes given job on collocated set on the node where the set is located
+ * (a.k.a. affinity co-location).
+ *
+ * @param job Job which will be co-located on the node with given affinity key.
+ * @throws IgniteException If job failed.
+ */
+ public <R> R affinityCall(IgniteCallable<R> job) throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index ca0250d..4be7413 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -44,9 +44,13 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.IgniteCompute;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -95,6 +99,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@GridToStringExclude
private final Semaphore writeSem;
+ /** access to affinityRun() and affinityCall() functions */
+ private final IgniteCompute compute;
+
/** */
private final boolean binaryMarsh;
@@ -113,6 +120,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
queueKey = new GridCacheQueueHeaderKey(queueName);
cache = cctx.kernalContext().cache().internalCache(cctx.name());
binaryMarsh = cctx.binaryMarshaller();
+ this.compute = cctx.kernalContext().grid().compute();
log = cctx.logger(getClass());
@@ -1063,4 +1071,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
@Override public String toString() {
return S.toString(GridCacheQueueAdapter.class, this);
}
+
+ /** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ compute.affinityRun(cache.name(),queueKey,job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ return compute.affinityCall(cache.name(),queueKey,job);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index f1fbe5b..93abbe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -36,6 +38,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
/**
@@ -777,4 +781,56 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
@Override public String toString() {
return delegate.toString();
}
+
+ /** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional()) {
+ CU.outTx(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!delegate.collocated())
+ throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+ delegate.affinityRun(job);
+ return null;
+ }
+ }, cctx);
+ } else
+ delegate.affinityRun(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ return CU.outTx(new Callable<R>() {
+ @Override public R call() throws Exception {
+ if (!delegate.collocated())
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+ return delegate.affinityCall(job);
+ }
+ }, cctx);
+
+ return delegate.affinityCall(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f25e361..3f03380 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -30,9 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSet;
+
+import org.apache.ignite.*;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -49,7 +48,9 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -83,12 +84,18 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/** Queue header partition. */
private final int hdrPart;
+ /** Queue header key. */
+ protected final GridCacheSetHeaderKey setKey;
+
/** Removed flag. */
private volatile boolean rmvd;
/** */
private final boolean binaryMarsh;
+ /** access to affinityRun() and affinityCall() functions */
+ private final IgniteCompute compute;
+
/**
* @param ctx Cache context.
* @param name Set name.
@@ -101,9 +108,12 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
id = hdr.id();
collocated = hdr.collocated();
binaryMarsh = ctx.binaryMarshaller();
+ compute = ctx.kernalContext().grid().compute();
cache = ctx.cache();
+ setKey = new GridCacheSetHeaderKey(name);
+
log = ctx.logger(GridCacheSetImpl.class);
hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
@@ -537,6 +547,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
return S.toString(GridCacheSetImpl.class, this);
}
+ /** {@inheritDoc} */
+ public void affinityRun(IgniteRunnable job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() +
+ ". This operation is supported only for collocated sets.");
+
+ compute.affinityRun(cache.name(),setKey,job);
+ }
+
+ /** {@inheritDoc} */
+ public <R> R affinityCall(IgniteCallable<R> job) {
+ if (!collocated)
+ throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() +
+ ". This operation is supported only for collocated queues.");
+
+ return compute.affinityCall(cache.name(),setKey,job);
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 9c47e1c..d40c286 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.NotNull;
/**
@@ -588,6 +590,52 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
t.set2(U.readString(in));
}
+ /** {@inheritDoc} */
+ @Override public void affinityRun(final IgniteRunnable job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ CU.outTx(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ delegate.affinityRun(job);
+ return null;
+ }
+ }, cctx);
+ else
+ delegate.affinityRun(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> R affinityCall(final IgniteCallable<R> job) {
+ gate.enter();
+
+ try {
+ if (cctx.transactional())
+ return CU.outTx(new Callable<R>() {
+ @Override public R call() throws Exception {
+ return delegate.affinityCall(job);
+ }
+ }, cctx);
+
+ return delegate.affinityCall(job);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ gate.leave();
+ }
+ }
+
/**
* Reconstructs object on unmarshalling.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc8d73ee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
index 5dea3f5..0c76595 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java
@@ -27,11 +27,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -604,6 +608,42 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName()));
}
+ public void testAffinityRun() throws Exception {
+ /** Test exception on non-collocated queue */
+ final CollectionConfiguration colCfg = collectionConfiguration();
+ colCfg.setCollocated(false);
+ colCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg);
+
+ GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ queue.affinityRun(new IgniteRunnable() {
+ @Override public void run() { ; }});
+ return null;
+ }
+ }, IgniteException.class,
+ "Failed to execute affinityRun() for non-collocated queue: " + queue.name() +
+ ". This operation is supported only for collocated queues.");
+
+ queue.close();
+
+ /** Test running a job on a collocated queue */
+ colCfg.setCollocated(true);
+ final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg);
+
+ /** add a number to the queue */
+ queue2.add(100);
+
+ /** read the number back using affinityRun() */
+ queue2.affinityRun(new IgniteRunnable() {
+ @Override public void run() {
+ assert((int)queue2.take() == 100);
+ }
+ });
+ queue2.close();
+ }
+
/**
* Test class with the same hash code.
*/
@@ -642,4 +682,5 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection
return S.toString(SameHashItem.class, this);
}
}
+
}
\ No newline at end of file