You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/18 09:35:27 UTC
ignite git commit: ignite-2893
Repository: ignite
Updated Branches:
refs/heads/ignite-2893 [created] 151c6aa0b
ignite-2893
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/151c6aa0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/151c6aa0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/151c6aa0
Branch: refs/heads/ignite-2893
Commit: 151c6aa0bfb8b05d177680b5c27cd4bef3787430
Parents: f136542
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 18 12:35:35 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 18 12:35:35 2017 +0300
----------------------------------------------------------------------
.../datastructures/GridCacheAtomicLongImpl.java | 604 +++++++++++--------
1 file changed, 343 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/151c6aa0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index dfd2122..79c5f07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -23,22 +23,19 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic long implementation.
*/
@@ -54,9 +51,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic long name. */
private String name;
@@ -75,126 +69,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get()}. */
- private final Callable<Long> getCall = new Callable<Long>() {
- @Override public Long call() throws Exception {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- return val.get();
- }
- };
-
- /** Callable for {@link #incrementAndGet()}. */
- private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() + 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to increment and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndIncrement()}. */
- private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal + 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and increment: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #decrementAndGet()}. */
- private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() - 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndDecrement()}. */
- private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal - 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -221,8 +95,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -235,7 +107,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicLongValue val = atomicView.get(key);
+
+ if (val == null)
+ throw new IgniteException("Failed to find atomic long: " + name);
+
+ return val.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -247,7 +124,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try{
- return CU.outTx(incAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -259,7 +143,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(getAndIncCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -271,7 +162,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(internalAddAndGet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -283,7 +181,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(internalGetAndAdd(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -295,7 +200,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(decAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -307,7 +219,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(getAndDecCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -319,7 +238,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(internalGetAndSet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -331,7 +257,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal;
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get() == expVal;
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -347,7 +280,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -421,182 +361,324 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
}
}
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
/**
- * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+ * Reconstructs object on unmarshalling.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Callable<Long> internalAddAndGet(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ private Object readResolve() throws ObjectStreamException {
+ try {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheAtomicLongImpl.class, this);
+ }
- long retVal = val.get() + l;
+ /**
+ *
+ */
+ static class GetAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ GetAndSetProcessor(long newVal) {
+ this.newVal = newVal;
+ }
- val.set(retVal);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- atomicView.put(key, val);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- tx.commit();
+ long curVal = val.get();
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to add and get: " + this, e);
+ e.setValue(new GridCacheAtomicLongValue(newVal));
- throw e;
- }
- }
- });
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndAdd(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ static class GetAndAddProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ GetAndAddProcessor(long delta) {
+ this.delta = delta;
+ }
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- val.set(retVal + l);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.put(key, val);
+ long curVal = val.get();
- tx.commit();
+ e.setValue(new GridCacheAtomicLongValue(curVal + delta));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and add: " + this, e);
+ return curVal;
+ }
- throw e;
- }
- }
- });
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndAddProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndSet(final long l) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class AddAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ AddAndGetProcessor(long delta) {
+ this.delta = delta;
+ }
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- long retVal = val.get();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- val.set(l);
+ long newVal = val.get() + delta;
- atomicView.put(key, val);
+ e.setValue(new GridCacheAtomicLongValue(newVal));
- tx.commit();
+ return newVal;
+ }
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and set: " + this, e);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AddAndGetProcessor.class, this);
+ }
+ }
- throw e;
- }
- }
- };
+ /**
+ *
+ */
+ static class CompareAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long expVal;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ CompareAndSetProcessor(long expVal, long newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long curVal = val.get();
+
+ if (curVal == expVal)
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompareAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #compareAndSetAndGet(long, long)}
- * operation in async and sync mode.
*
- * @param expVal Expected atomic long value.
- * @param newVal New atomic long value.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class GetAndIncrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** */
+ private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor();
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- if (retVal == expVal) {
- val.set(newVal);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.getAndPut(key, val);
+ long ret = val.get();
- tx.commit();
- }
+ e.setValue(new GridCacheAtomicLongValue(ret + 1));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ return ret;
+ }
- throw e;
- }
- }
- };
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndIncrementProcessor.class, this);
+ }
}
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx.kernalContext());
- out.writeUTF(name);
- }
+ /**
+ *
+ */
+ static class IncrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ /** */
+ private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor();
- t.set1((GridKernalContext)in.readObject());
- t.set2(in.readUTF());
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long newVal = val.get() + 1;
+
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IncrementAndGetProcessor.class, this);
+ }
}
/**
- * Reconstructs object on unmarshalling.
*
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Object readResolve() throws ObjectStreamException {
- try {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ static class GetAndDecrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
- }
- catch (IgniteCheckedException e) {
- throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ /** */
+ private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long ret = val.get();
+
+ e.setValue(new GridCacheAtomicLongValue(ret - 1));
+
+ return ret;
}
- finally {
- stash.remove();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndDecrementProcessor.class, this);
}
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheAtomicLongImpl.class, this);
+ /**
+ *
+ */
+ static class DecrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long newVal = val.get() - 1;
+
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DecrementAndGetProcessor.class, this);
+ }
}
}