You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/18 09:35:27 UTC

ignite git commit: ignite-2893

Repository: ignite
Updated Branches:
  refs/heads/ignite-2893 [created] 151c6aa0b


ignite-2893


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/151c6aa0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/151c6aa0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/151c6aa0

Branch: refs/heads/ignite-2893
Commit: 151c6aa0bfb8b05d177680b5c27cd4bef3787430
Parents: f136542
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 18 12:35:35 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 18 12:35:35 2017 +0300

----------------------------------------------------------------------
 .../datastructures/GridCacheAtomicLongImpl.java | 604 +++++++++++--------
 1 file changed, 343 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/151c6aa0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index dfd2122..79c5f07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -23,22 +23,19 @@ import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
 /**
  * Cache atomic long implementation.
  */
@@ -54,9 +51,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
             }
         };
 
-    /** Logger. */
-    private IgniteLogger log;
-
     /** Atomic long name. */
     private String name;
 
@@ -75,126 +69,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Callable for {@link #get()}. */
-    private final Callable<Long> getCall = new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            GridCacheAtomicLongValue val = atomicView.get(key);
-
-            if (val == null)
-                throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-            return val.get();
-        }
-    };
-
-    /** Callable for {@link #incrementAndGet()}. */
-    private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get() + 1;
-
-                val.set(retVal);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to increment and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #getAndIncrement()}. */
-    private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get();
-
-                val.set(retVal + 1);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to get and increment: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #decrementAndGet()}. */
-    private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get() - 1;
-
-                val.set(retVal);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to decrement and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
-    /** Callable for {@link #getAndDecrement()}. */
-    private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
-        @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheAtomicLongValue val = atomicView.get(key);
-
-                if (val == null)
-                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
-                long retVal = val.get();
-
-                val.set(retVal - 1);
-
-                atomicView.put(key, val);
-
-                tx.commit();
-
-                return retVal;
-            }
-            catch (Error | Exception e) {
-                U.error(log, "Failed to get and decrement and get: " + this, e);
-
-                throw e;
-            }
-        }
-    });
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -221,8 +95,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         this.key = key;
         this.atomicView = atomicView;
         this.name = name;
-
-        log = ctx.logger(getClass());
     }
 
     /** {@inheritDoc} */
@@ -235,7 +107,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(getCall, ctx);
+            GridCacheAtomicLongValue val = atomicView.get(key);
+
+            if (val == null)
+                throw new IgniteException("Failed to find atomic long: " + name);
+
+            return val.get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -247,7 +124,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try{
-            return CU.outTx(incAndGetCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -259,7 +143,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(getAndIncCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -271,7 +162,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(internalAddAndGet(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -283,7 +181,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(internalGetAndAdd(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -295,7 +200,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(decAndGetCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -307,7 +219,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(getAndDecCall, ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE);
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -319,7 +238,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(internalGetAndSet(l), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -331,7 +257,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal;
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get() == expVal;
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -347,7 +280,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         checkRemoved();
 
         try {
-            return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+            EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+            assert res != null && res.get() != null : res;
+
+            return res.get();
+        }
+        catch (EntryProcessorException e) {
+            throw new IgniteException(e.getMessage(), e);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -421,182 +361,324 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
     /**
-     * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+     * Reconstructs object on unmarshalling.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
      */
-    private Callable<Long> internalAddAndGet(final long l) {
-        return retryTopologySafe(new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    private Object readResolve() throws ObjectStreamException {
+        try {
+            IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+            return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+        finally {
+            stash.remove();
+        }
+    }
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheAtomicLongImpl.class, this);
+    }
 
-                    long retVal = val.get() + l;
+    /**
+     *
+     */
+    static class GetAndSetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long newVal;
+
+        /**
+         * @param newVal New value.
+         */
+        GetAndSetProcessor(long newVal) {
+            this.newVal = newVal;
+        }
 
-                    val.set(retVal);
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    atomicView.put(key, val);
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    tx.commit();
+            long curVal = val.get();
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to add and get: " + this, e);
+            e.setValue(new GridCacheAtomicLongValue(newVal));
 
-                    throw e;
-                }
-            }
-        });
+            return curVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndSetProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalGetAndAdd(final long l) {
-        return retryTopologySafe(new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
-
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+    static class GetAndAddProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long delta;
+
+        /**
+         * @param delta Delta.
+         */
+        GetAndAddProcessor(long delta) {
+            this.delta = delta;
+        }
 
-                    long retVal = val.get();
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    val.set(retVal + l);
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    atomicView.put(key, val);
+            long curVal = val.get();
 
-                    tx.commit();
+            e.setValue(new GridCacheAtomicLongValue(curVal + delta));
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to get and add: " + this, e);
+            return curVal;
+        }
 
-                    throw e;
-                }
-            }
-        });
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndAddProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
      *
-     * @param l Value will be added to atomic long.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalGetAndSet(final long l) {
-        return new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    static class AddAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long delta;
+
+        /**
+         * @param delta Delta.
+         */
+        AddAndGetProcessor(long delta) {
+            this.delta = delta;
+        }
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    long retVal = val.get();
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                    val.set(l);
+            long newVal = val.get() + delta;
 
-                    atomicView.put(key, val);
+            e.setValue(new GridCacheAtomicLongValue(newVal));
 
-                    tx.commit();
+            return newVal;
+        }
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to get and set: " + this, e);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AddAndGetProcessor.class, this);
+        }
+    }
 
-                    throw e;
-                }
-            }
-        };
+    /**
+     *
+     */
+    static class CompareAndSetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final long expVal;
+
+        /** */
+        private final long newVal;
+
+        /**
+         * @param expVal Expected value.
+         * @param newVal New value.
+         */
+        CompareAndSetProcessor(long expVal, long newVal) {
+            this.expVal = expVal;
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long curVal = val.get();
+
+            if (curVal == expVal)
+                e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return curVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CompareAndSetProcessor.class, this);
+        }
     }
 
     /**
-     * Method returns callable for execution {@link #compareAndSetAndGet(long, long)}
-     * operation in async and sync mode.
      *
-     * @param expVal Expected atomic long value.
-     * @param newVal New atomic long value.
-     * @return Callable for execution in async and sync mode.
      */
-    private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
-        return new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicLongValue val = atomicView.get(key);
+    static class GetAndIncrementProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-                    if (val == null)
-                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+        /** */
+        private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor();
 
-                    long retVal = val.get();
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
 
-                    if (retVal == expVal) {
-                        val.set(newVal);
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
 
-                        atomicView.getAndPut(key, val);
+            long ret = val.get();
 
-                        tx.commit();
-                    }
+            e.setValue(new GridCacheAtomicLongValue(ret + 1));
 
-                    return retVal;
-                }
-                catch (Error | Exception e) {
-                    U.error(log, "Failed to compare and set: " + this, e);
+            return ret;
+        }
 
-                    throw e;
-                }
-            }
-        };
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndIncrementProcessor.class, this);
+        }
     }
 
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(ctx.kernalContext());
-        out.writeUTF(name);
-    }
+    /**
+     *
+     */
+    static class IncrementAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+        /** */
+        private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor();
 
-        t.set1((GridKernalContext)in.readObject());
-        t.set2(in.readUTF());
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long newVal = val.get() + 1;
+
+            e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IncrementAndGetProcessor.class, this);
+        }
     }
 
     /**
-     * Reconstructs object on unmarshalling.
      *
-     * @return Reconstructed object.
-     * @throws ObjectStreamException Thrown in case of unmarshalling error.
      */
-    private Object readResolve() throws ObjectStreamException {
-        try {
-            IgniteBiTuple<GridKernalContext, String> t = stash.get();
+    static class GetAndDecrementProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-            return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        /** */
+        private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor();
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long ret = val.get();
+
+            e.setValue(new GridCacheAtomicLongValue(ret - 1));
+
+            return ret;
         }
-        finally {
-            stash.remove();
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(GetAndDecrementProcessor.class, this);
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheAtomicLongImpl.class, this);
+    /**
+     *
+     */
+    static class DecrementAndGetProcessor implements
+        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor();
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+            GridCacheAtomicLongValue val = e.getValue();
+
+            if (val == null)
+                throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+            long newVal = val.get() - 1;
+
+            e.setValue(new GridCacheAtomicLongValue(newVal));
+
+            return newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DecrementAndGetProcessor.class, this);
+        }
     }
 }