You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/20 10:10:38 UTC
[2/2] ignite git commit: ignite-2893 For datastructures use invoke
instead of explicit txs, got rid of unnecessary outTx usage.
ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee955df9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee955df9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee955df9
Branch: refs/heads/ignite-2.0
Commit: ee955df9fb80737292aac5f7ad3c82f8f0d8ea8e
Parents: f440480
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 20 13:10:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 20 13:10:28 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +-
.../processors/cache/GridCacheUtils.java | 117 ++--
.../datastructures/DataStructuresProcessor.java | 61 +-
.../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++--------
.../GridCacheAtomicReferenceImpl.java | 276 ++++----
.../GridCacheAtomicSequenceImpl.java | 88 +--
.../GridCacheAtomicStampedImpl.java | 293 ++++-----
.../GridCacheCountDownLatchImpl.java | 56 +-
.../datastructures/GridCacheLockImpl.java | 80 +--
.../datastructures/GridCacheQueueProxy.java | 292 +--------
.../datastructures/GridCacheSemaphoreImpl.java | 56 +-
.../datastructures/GridCacheSetProxy.java | 152 +----
.../GridTransactionalCacheQueueImpl.java | 8 +-
13 files changed, 812 insertions(+), 1297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a3d4c81..5438163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2451,7 +2451,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
+ @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
throws IgniteCheckedException {
assert topVer == null || tx.implicit();
@@ -2489,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
+ @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 5abb6de..df9c7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -881,31 +881,6 @@ public class GridCacheUtils {
}
/**
- * Method executes any Callable out of scope of transaction.
- * If transaction started by this thread {@code cmd} will be executed in another thread.
- *
- * @param cmd Callable.
- * @param ctx Cache context.
- * @return T Callable result.
- * @throws IgniteCheckedException If execution failed.
- */
- public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException {
- if (ctx.tm().inUserTx())
- return ctx.closures().callLocalSafe(cmd, false).get();
- else {
- try {
- return cmd.call();
- }
- catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
- }
- }
-
- /**
* @param val Value.
* @param skip Skip value flag.
* @return Value.
@@ -1604,56 +1579,58 @@ public class GridCacheUtils {
/**
* @param c Closure to retry.
- * @param <S> Closure type.
- * @return Wrapped closure.
- */
- public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
- return new Callable<S>() {
- @Override public S call() throws Exception {
- IgniteCheckedException err = null;
-
- for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
- try {
- return c.call();
- }
- catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ * @throws IgniteCheckedException If failed.
+ * @return Closure result.
+ */
+ public static <S> S retryTopologySafe(final Callable<S> c) throws IgniteCheckedException {
+ IgniteCheckedException err = null;
+
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
+ try {
+ return c.call();
+ }
+ catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ throw e;
+ }
+ catch (TransactionRollbackException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ U.sleep(1);
+ }
+ catch (IgniteCheckedException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+ ClusterTopologyServerNotFoundException)
throw e;
- }
- catch (TransactionRollbackException e) {
- if (i + 1 == GridCacheAdapter.MAX_RETRIES)
- throw e;
+ // IGNITE-1948: remove this check when the issue is fixed
+ if (topErr.retryReadyFuture() != null)
+ topErr.retryReadyFuture().get();
+ else
U.sleep(1);
- }
- catch (IgniteCheckedException e) {
- if (i + 1 == GridCacheAdapter.MAX_RETRIES)
- throw e;
-
- if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
- ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
- if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
- ClusterTopologyServerNotFoundException)
- throw e;
-
- // IGNITE-1948: remove this check when the issue is fixed
- if (topErr.retryReadyFuture() != null)
- topErr.retryReadyFuture().get();
- else
- U.sleep(1);
- }
- else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
- CachePartialUpdateCheckedException.class))
- U.sleep(1);
- else
- throw e;
- }
}
-
- // Should never happen.
- throw err;
+ else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+ CachePartialUpdateCheckedException.class))
+ U.sleep(1);
+ else
+ throw e;
}
- };
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ // Should never happen.
+ throw err;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 102db96..0a439dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -476,7 +476,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Sequence name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeSequence(final String name) throws IgniteCheckedException {
+ final void removeSequence(final String name) throws IgniteCheckedException {
assert name != null;
awaitInitialization();
@@ -488,9 +488,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicSequenceValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -631,7 +629,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic long name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicLong(final String name) throws IgniteCheckedException {
+ final void removeAtomicLong(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -642,7 +640,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -790,7 +788,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic reference name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicReference(final String name) throws IgniteCheckedException {
+ final void removeAtomicReference(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -801,9 +799,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicReferenceValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -894,7 +890,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic stamped name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicStamped(final String name) throws IgniteCheckedException {
+ final void removeAtomicStamped(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -905,9 +901,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicStampedValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -1516,43 +1510,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
}
/**
- * Remove internal entry by key from cache.
- *
- * @param key Internal entry key.
- * @param cls Class of object which will be removed. If cached object has different type exception will be thrown.
- * @return Method returns true if sequence has been removed and false if it's not cached.
- * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
- */
- private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
- return CU.outTx(
- new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
- // Check correctness type of removable object.
- R val = cast(dsView.get(key), cls);
-
- if (val != null) {
- dsView.remove(key);
-
- tx.commit();
- }
- else
- tx.setRollbackOnly();
-
- return val != null;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to remove data structure: " + key, e);
-
- throw e;
- }
- }
- },
- dsCacheCtx
- );
- }
-
- /**
*
*/
static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
@@ -1769,7 +1726,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
- return GridCacheUtils.retryTopologySafe(call).call();
+ return GridCacheUtils.retryTopologySafe(call);
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index be718cf..3f07151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -23,23 +23,20 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic long implementation.
*/
@@ -55,9 +52,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic long name. */
private String name;
@@ -76,126 +70,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get()}. */
- private final Callable<Long> getCall = new Callable<Long>() {
- @Override public Long call() throws Exception {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- return val.get();
- }
- };
-
- /** Callable for {@link #incrementAndGet()}. */
- private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() + 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to increment and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndIncrement()}. */
- private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal + 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and increment: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #decrementAndGet()}. */
- private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() - 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndDecrement()}. */
- private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal - 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -211,8 +85,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
* @param atomicView Atomic projection.
* @param ctx CacheContext.
*/
- public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key,
- IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) {
+ public GridCacheAtomicLongImpl(String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView,
+ GridCacheContext ctx) {
assert key != null;
assert atomicView != null;
assert ctx != null;
@@ -222,8 +98,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -236,7 +110,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicLongValue val = atomicView.get(key);
+
+ if (val == null)
+ throw new IgniteException("Failed to find atomic long: " + name);
+
+ return val.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -248,7 +127,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try{
- return CU.outTx(incAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -260,7 +146,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getAndIncCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -272,7 +165,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalAddAndGet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -284,7 +184,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalGetAndAdd(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -296,7 +203,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(decAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -308,7 +222,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getAndDecCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -320,7 +241,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalGetAndSet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -332,7 +260,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal;
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get() == expVal;
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -348,7 +283,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -422,193 +364,335 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
}
}
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ this.atomicView = kctx.cache().atomicsCache();
+ this.ctx = atomicView.context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
/**
- * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+ * Reconstructs object on unmarshalling.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Callable<Long> internalAddAndGet(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ private Object readResolve() throws ObjectStreamException {
+ try {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
- long retVal = val.get() + l;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheAtomicLongImpl.class, this);
+ }
- val.set(retVal);
+ /**
+ *
+ */
+ static class GetAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ GetAndSetProcessor(long newVal) {
+ this.newVal = newVal;
+ }
- atomicView.put(key, val);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- tx.commit();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to add and get: " + this, e);
+ long curVal = val.get();
- throw e;
- }
- }
- });
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndAdd(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ static class GetAndAddProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ GetAndAddProcessor(long delta) {
+ this.delta = delta;
+ }
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- val.set(retVal + l);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.put(key, val);
+ long curVal = val.get();
- tx.commit();
+ e.setValue(new GridCacheAtomicLongValue(curVal + delta));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and add: " + this, e);
+ return curVal;
+ }
- throw e;
- }
- }
- });
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndAddProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndSet(final long l) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class AddAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ AddAndGetProcessor(long delta) {
+ this.delta = delta;
+ }
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- long retVal = val.get();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- val.set(l);
+ long newVal = val.get() + delta;
- atomicView.put(key, val);
+ e.setValue(new GridCacheAtomicLongValue(newVal));
- tx.commit();
+ return newVal;
+ }
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and set: " + this, e);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AddAndGetProcessor.class, this);
+ }
+ }
- throw e;
- }
- }
- };
+ /**
+ *
+ */
+ static class CompareAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long expVal;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ CompareAndSetProcessor(long expVal, long newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long curVal = val.get();
+
+ if (curVal == expVal)
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompareAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #compareAndSetAndGet(long, long)}
- * operation in async and sync mode.
*
- * @param expVal Expected atomic long value.
- * @param newVal New atomic long value.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class GetAndIncrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** */
+ private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor();
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- if (retVal == expVal) {
- val.set(newVal);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.getAndPut(key, val);
+ long ret = val.get();
- tx.commit();
- }
+ e.setValue(new GridCacheAtomicLongValue(ret + 1));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ return ret;
+ }
- throw e;
- }
- }
- };
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndIncrementProcessor.class, this);
+ }
}
- /** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- this.atomicView = kctx.cache().atomicsCache();
- this.ctx = atomicView.context();
- }
+ /**
+ *
+ */
+ static class IncrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ /** */
+ private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor();
- }
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx.kernalContext());
- out.writeUTF(name);
- }
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ long newVal = val.get() + 1;
- t.set1((GridKernalContext)in.readObject());
- t.set2(in.readUTF());
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IncrementAndGetProcessor.class, this);
+ }
}
/**
- * Reconstructs object on unmarshalling.
*
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Object readResolve() throws ObjectStreamException {
- try {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ static class GetAndDecrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
- }
- catch (IgniteCheckedException e) {
- throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ /** */
+ private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long ret = val.get();
+
+ e.setValue(new GridCacheAtomicLongValue(ret - 1));
+
+ return ret;
}
- finally {
- stash.remove();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndDecrementProcessor.class, this);
}
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheAtomicLongImpl.class, this);
+ /**
+ *
+ */
+ static class DecrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long newVal = val.get() - 1;
+
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DecrementAndGetProcessor.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 4365468..b7dc007 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -23,24 +23,21 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic reference implementation.
*/
@@ -56,9 +53,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic reference name. */
private String name;
@@ -77,18 +71,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get} operation */
- private final Callable<T> getCall = new Callable<T>() {
- @Override public T call() throws Exception {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- return ref.get();
- }
- };
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -117,8 +99,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -131,7 +111,12 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
+
+ if (ref == null)
+ throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
+
+ return ref.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -143,7 +128,10 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
checkRemoved();
try {
- CU.outTx(internalSet(val), ctx);
+ atomicView.invoke(key, new ReferenceSetEntryProcessor<>(val));
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -152,20 +140,42 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** {@inheritDoc} */
@Override public boolean compareAndSet(T expVal, T newVal) {
- return compareAndSetAndGet(newVal, expVal) == expVal;
+ try {
+ EntryProcessorResult<Boolean> res =
+ atomicView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/**
* Compares current value with specified value for equality and, if they are equal, replaces current value.
*
* @param newVal New value to set.
+ * @param expVal Expected value.
* @return Original value.
*/
public T compareAndSetAndGet(T newVal, T expVal) {
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+ EntryProcessorResult<T> res =
+ atomicView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal));
+
+ assert res != null;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -205,82 +215,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
}
- /**
- * Method returns callable for execution {@link #set(Object)} operation in async and sync mode.
- *
- * @param val Value will be set in reference .
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalSet(final T val) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- ref.set(val);
-
- atomicView.put(key, ref);
-
- tx.commit();
-
- return true;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
- /**
- * Conditionally sets the new value. It will be set if {@code expValPred} is
- * evaluate to {@code true}.
- *
- * @param expVal Expected value.
- * @param newVal New value.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
- return retryTopologySafe(new Callable<T>() {
- @Override public T call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- T origVal = ref.get();
-
- if (!F.eq(expVal, origVal)) {
- tx.setRollbackOnly();
-
- return origVal;
- }
- else {
- ref.set(newVal);
-
- atomicView.getAndPut(key, ref);
-
- tx.commit();
-
- return expVal;
- }
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" +
- newVal + ", atomicReference" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
this.atomicView = kctx.cache().atomicsCache();
@@ -289,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+ // No-op.
}
/**
@@ -363,6 +297,136 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
}
+ /**
+ *
+ */
+ static class ReferenceSetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ ReferenceSetEntryProcessor(T newVal) {
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ e.setValue(new GridCacheAtomicReferenceValue<>(newVal));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ReferenceCompareAndSetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ ReferenceCompareAndSetEntryProcessor(T expVal, T newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ T curVal = val.get();
+
+ if (F.eq(expVal, curVal)) {
+ e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceCompareAndSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ReferenceCompareAndSetAndGetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ ReferenceCompareAndSetAndGetEntryProcessor(T expVal, T newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ T curVal = val.get();
+
+ if (F.eq(expVal, curVal))
+ e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceCompareAndSetAndGetEntryProcessor.class, this);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheAtomicReferenceImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 0661b11..d14bb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -32,11 +32,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -256,7 +254,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
if (updateGuard.compareAndSet(false, true)) {
try {
try {
- return updateCall.call();
+ return retryTopologySafe(updateCall);
}
catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
throw e;
@@ -303,86 +301,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
}
}
- /**
- * Asynchronous sequence update operation. Will add given amount to the sequence value.
- *
- * @param l Increment amount.
- * @param updateCall Cache call that will update sequence reservation count in accordance with l.
- * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
- * prior to update.
- * @return Future indicating sequence value.
- * @throws IgniteCheckedException If update failed.
- */
- @SuppressWarnings("SignalWithoutCorrespondingAwait")
- private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
- throws IgniteCheckedException {
- checkRemoved();
-
- A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
-
- lock.lock();
-
- try {
- // If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
-
- locVal += l;
-
- return new GridFinishedFuture<>(updated ? locVal : curVal);
- }
- }
- finally {
- lock.unlock();
- }
-
- if (updateCall == null)
- updateCall = internalUpdate(l, updated);
-
- while (true) {
- if (updateGuard.compareAndSet(false, true)) {
- try {
- // This call must be outside lock.
- return ctx.closures().callLocalSafe(updateCall, true);
- }
- finally {
- lock.lock();
-
- try {
- updateGuard.set(false);
-
- cond.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
- }
- else {
- lock.lock();
-
- try {
- while (locVal >= upBound && updateGuard.get())
- U.await(cond, 500, MILLISECONDS);
-
- checkRemoved();
-
- // If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
-
- locVal += l;
-
- return new GridFinishedFuture<>(updated ? locVal : curVal);
- }
- }
- finally {
- lock.unlock();
- }
- }
- }
- }
-
/** Get local batch size for this sequences.
*
* @return Sequence batch size.
@@ -485,7 +403,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
*/
@SuppressWarnings("TooBroadScope")
private Callable<Long> internalUpdate(final long l, final boolean updated) {
- return retryTopologySafe(new Callable<Long>() {
+ return new Callable<Long>() {
@Override public Long call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
@@ -556,7 +474,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
throw e;
}
}
- });
+ };
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 09cea43..3f14942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -23,25 +23,20 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache atomic stamped implementation.
@@ -58,9 +53,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic stamped name. */
private String name;
@@ -79,42 +71,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get()} operation */
- private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() {
- @Override public IgniteBiTuple<T, S> call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.get();
- }
- });
-
- /** Callable for {@link #value()} operation */
- private final Callable<T> valCall = retryTopologySafe(new Callable<T>() {
- @Override public T call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.value();
- }
- });
-
- /** Callable for {@link #stamp()} operation */
- private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() {
- @Override public S call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.stamp();
- }
- });
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -130,8 +86,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
* @param atomicView Atomic projection.
* @param ctx Cache context.
*/
- public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey,
- GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) {
+ public GridCacheAtomicStampedImpl(String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView,
+ GridCacheContext ctx) {
assert key != null;
assert atomicView != null;
assert ctx != null;
@@ -141,8 +99,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -155,7 +111,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -167,7 +128,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- CU.outTx(internalSet(val, stamp), ctx);
+ atomicView.invoke(key, new StampedSetEntryProcessor<>(val, stamp));
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -179,8 +143,15 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal),
- F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx);
+ EntryProcessorResult<Boolean> res =
+ atomicView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -192,7 +163,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(stampCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.stamp();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -204,7 +180,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(valCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.value();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -244,100 +225,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
}
- /**
- * Method make wrapper closure for existing value.
- *
- * @param val Value.
- * @return Closure.
- */
- private <N> IgniteClosure<N, N> wrapperClosure(final N val) {
- return new IgniteClosure<N, N>() {
- @Override public N apply(N e) {
- return val;
- }
- };
- }
-
- /**
- * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode.
- *
- * @param val Value will be set in the atomic stamped.
- * @param stamp Stamp will be set in the atomic stamped.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalSet(final T val, final S stamp) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- stmp.set(val, stamp);
-
- atomicView.put(key, stmp);
-
- tx.commit();
-
- return true;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
- /**
- * Conditionally asynchronously sets the new value and new stamp. They will be set if
- * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}.
- *
- * @param expValPred Predicate which should evaluate to {@code true} for value to be set
- * @param newValClos Closure generates new value.
- * @param expStampPred Predicate which should evaluate to {@code true} for value to be set
- * @param newStampClos Closure generates new stamp value.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
- final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred,
- final IgniteClosure<S, S> newStampClos) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) {
- tx.setRollbackOnly();
-
- return false;
- }
- else {
- stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp()));
-
- atomicView.getAndPut(key, stmp);
-
- tx.commit();
-
- return true;
- }
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" +
- newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos +
- ", atomicStamped=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());
@@ -418,6 +305,104 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
return new IllegalStateException("Atomic stamped was removed from cache: " + name);
}
+ /**
+ *
+ */
+ static class StampedSetEntryProcessor<T, S> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T newVal;
+
+ /** */
+ private final S newStamp;
+
+ /**
+ * @param newVal New value.
+ * @param newStamp New stamp value.
+ */
+ StampedSetEntryProcessor(T newVal, S newStamp) {
+ this.newVal = newVal;
+ this.newStamp = newStamp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+ Object... args) {
+ GridCacheAtomicStampedValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+ e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(StampedSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class StampedCompareAndSetEntryProcessor<T, S> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final S expStamp;
+
+ /** */
+ private final T newVal;
+
+ /** */
+ private final S newStamp;
+
+ /**
+ * @param expVal Expected value.
+ * @param expStamp Expected stamp.
+ * @param newVal New value.
+ * @param newStamp New stamp value.
+ */
+ StampedCompareAndSetEntryProcessor(T expVal, S expStamp, T newVal, S newStamp) {
+ this.expVal = expVal;
+ this.expStamp = expStamp;
+ this.newVal = newVal;
+ this.newStamp = newStamp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+ Object... args) {
+ GridCacheAtomicStampedValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+ if (F.eq(expVal, val.value()) && F.eq(expStamp, val.stamp())) {
+ e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(StampedCompareAndSetEntryProcessor.class, this);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index ea80cc5..86e99a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -152,7 +152,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public int count() {
try {
- return CU.outTx(new GetCountCallable(), ctx);
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+ return latchVal == null ? 0 : latchVal.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -208,7 +210,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
A.ensure(val > 0, "val should be positive");
try {
- return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx);
+ return retryTopologySafe(new CountDownCallable(val));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -218,7 +220,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc}*/
@Override public void countDownAll() {
try {
- CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx);
+ retryTopologySafe(new CountDownCallable(0));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -255,23 +257,22 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
int state = initGuard.get();
if (state != READY_LATCH_STATE) {
- /** Internal latch is not fully initialized yet. Remember latest latch value. */
+ /* Internal latch is not fully initialized yet. Remember latest latch value. */
lastLatchVal = cnt;
return;
}
- /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+ /* 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
latch0 = internalLatch;
}
- /** Internal latch is fully initialized and ready for the usage. */
+ /* Internal latch is fully initialized and ready for the usage. */
assert latch0 != null;
while (latch0.getCount() > cnt)
latch0.countDown();
-
}
/**
@@ -280,27 +281,24 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private void initializeLatch() throws IgniteCheckedException {
if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
try {
- internalLatch = CU.outTx(
- retryTopologySafe(new Callable<CountDownLatch>() {
- @Override public CountDownLatch call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue val = latchView.get(key);
+ internalLatch = retryTopologySafe(new Callable<CountDownLatch>() {
+ @Override public CountDownLatch call() throws Exception {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheCountDownLatchValue val = latchView.get(key);
- if (val == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find count down latch with given name: " + name);
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find count down latch with given name: " + name);
- return new CountDownLatch(0);
- }
+ return new CountDownLatch(0);
+ }
- tx.commit();
+ tx.commit();
- return new CountDownLatch(val.get());
- }
+ return new CountDownLatch(val.get());
}
- }),
- ctx
- );
+ }
+ });
synchronized (initGuard) {
if (lastLatchVal != null) {
@@ -392,18 +390,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/**
*
*/
- private class GetCountCallable implements Callable<Integer> {
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
- return latchVal == null ? 0 : latchVal.get();
- }
- }
-
- /**
- *
- */
private class CountDownCallable implements Callable<Integer> {
/** Value to count down on (if 0 then latch is counted down to 0). */
private final int val;