You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/08/29 12:17:46 UTC

[04/12] ignite git commit: IGNITE-1678 first implementation (fully not working)

IGNITE-1678 first implementation (fully not working)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/839f43ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/839f43ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/839f43ad

Branch: refs/heads/ignite-1678
Commit: 839f43adf80a04e4a2778983560031592a20a89e
Parents: 4a1d0c2
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Sun Aug 26 14:45:49 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Sun Aug 26 14:45:49 2018 +0300

----------------------------------------------------------------------
 .../configuration/AtomicConfiguration.java      |   2 +-
 .../GridCacheAtomicSequenceImpl.java            | 270 ++++++++++---------
 2 files changed, 147 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
index 8d0e0be..5dfd1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
@@ -38,7 +38,7 @@ public class AtomicConfiguration {
     public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000;
 
     /** Default atomic sequence reservation size. */
-    public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 70;
+    public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 80;
 
     /** Default batch size for all cache's sequences. */
     private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 61e9262..c69d483 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -27,21 +27,20 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -50,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  * Cache sequence implementation.
  */
 public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<GridCacheAtomicSequenceValue>
-    implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable {
+    implements GridCacheAtomicSequenceEx, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -73,22 +72,25 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
     private volatile int batchSize;
 
     /** Reservation percentage. */
-    private int reservePercentage;
+    private volatile int reservePercentage;
 
-    /** Synchronization lock for local value updates. */
-    private final Lock localUpdate = new ReentrantLock();
+    /** Reserved bottom bound of local counter (included). */
+    private volatile long reservedBottomBound;
+
+    /** Reserved upper bound of local counter (not included). */
+    private volatile long reservedUpBound;
 
-    /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
-    private final ReentrantLock distUpdateFreeTop = new ReentrantLock();
+    /** A limit after which a new reservation should be done. */
+    private volatile long newReservationLine;
 
-    /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
-    private final ReentrantLock distUpdateLockedTop = new ReentrantLock();
+    /** Reservation future. */
+    private volatile IgniteInternalFuture<?> reservationFut;
 
-    /** Callable for execution {@link #incrementAndGet} operation in async and sync mode.  */
-    private final Callable<Long> incAndGetCall = internalUpdate(1, true);
+    /** Reservation pool. */
+    private final byte poolPlc = GridIoPolicy.SYSTEM_POOL;
 
-    /** Callable for execution {@link #getAndIncrement} operation in async and sync mode.  */
-    private final Callable<Long> getAndIncCall = internalUpdate(1, false);
+    /** Synchronization lock for local value updates. */
+    private final Lock localUpdateLock = new ReentrantLock();
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -125,6 +127,11 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
         this.reservePercentage = reservePercentage;
         this.upBound = upBound;
         this.locVal = locVal;
+
+        reservedBottomBound = locVal;
+        reservedUpBound = upBound;
+        // Calculate next reservation bound.
+        newReservationLine = calculateNewReservationLine(locVal);
     }
 
     /** {@inheritDoc} */
@@ -137,7 +144,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
     /** {@inheritDoc} */
     @Override public long incrementAndGet() {
         try {
-            return internalUpdate(1, incAndGetCall, true);
+            return internalUpdate(1, true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -147,7 +154,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
     /** {@inheritDoc} */
     @Override public long getAndIncrement() {
         try {
-            return internalUpdate(1, getAndIncCall, false);
+            return internalUpdate(1, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -159,7 +166,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
         try {
-            return internalUpdate(l, null, true);
+            return internalUpdate(l, true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -171,7 +178,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
         A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 
         try {
-            return internalUpdate(l, null, false);
+            return internalUpdate(l, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -182,99 +189,121 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
      * Synchronous sequence update operation. Will add given amount to the sequence value.
      *
      * @param l Increment amount.
-     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
      * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
      *      prior to update.
      * @return Sequence value.
      * @throws IgniteCheckedException If update failed.
      */
     @SuppressWarnings("SignalWithoutCorrespondingAwait")
-    private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
-        checkRemoved();
-
+    private long internalUpdate(long l, boolean updated) throws IgniteCheckedException {
         assert l > 0;
 
-        localUpdate.lock();
+        while (true){
+            checkRemoved();
 
-        try {
-            // If reserved range isn't exhausted.
-            long locVal0 = locVal;
+            localUpdateLock.lock();
 
-            if (locVal0 + l <= upBound) {
-                locVal = locVal0 + l;
+            IgniteInternalFuture<?> reservation = reservationFut;
 
-                return updated ? locVal0 + l : locVal0;
-            }
-        }
-        finally {
-            localUpdate.unlock();
-        }
+            try {
+                boolean reservationInProgress = reservation != null;
 
-        AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);
+                long newLocalVal = locVal + l;
 
-        // We need two separate locks here because two independent thread may attempt to update the sequence
-        // simultaneously, one thread with locked topology and other with unlocked.
-        // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
-        // waits for topology change, and locked topology thread waits to acquire the lock.
-        // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
-        // we do not want multiple threads to attempt to run identical cache updates.
-        ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;
+                // Reserve new interval if operation is not in progress.
+                if (newLocalVal >= newReservationLine && newLocalVal <= reservedUpBound && !reservationInProgress){
+                    reservationFut = runAsyncReservation(0);
+                }
 
-        distLock.lock();
+                long locVal0 = locVal;
 
-        try {
-            if (updateCall == null)
-                updateCall = internalUpdate(l, updated);
+                if (newLocalVal <= upBound) {
+                    locVal = newLocalVal;
 
-            try {
-                return CU.retryTopologySafe(updateCall);
-            }
-            catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
-                throw e;
+                    return updated ? newLocalVal : locVal0;
+                }
+
+                // Await complete previous reservation.
+                if (reservationInProgress){
+                    reservation.get();
+
+                    reservationFut = null;
+
+                    // Retry check bounds.
+                    continue;
+                }
+
+                // Still in reserved interval.
+                if (newLocalVal < reservedUpBound) {
+                    long curVal = locVal;
+
+                    if (newLocalVal < reservedBottomBound)
+                        locVal = reservedBottomBound;
+                    else
+                        locVal += l;
+
+                    upBound = reservedUpBound;
+
+                    return updated ? locVal : curVal;
+                }
+                // Switched to the next interval. New value more that upper reserved bound.
+                else if (!reservationInProgress) {
+                    long diff = newLocalVal - reservedUpBound;
+
+                    // Calculate how many batch size included in l.
+                    // It will our offset for global seq counter.
+                    long off = (diff / batchSize) * batchSize;
+
+                    reservationFut = runAsyncReservation(off);
+
+                    // Can not wait async, should wait under lock until new interval reserved.
+                    reservationFut.get();
+
+                    reservationFut = null;
+                }
             }
-            catch (Exception e) {
-                throw new IgniteCheckedException(e);
+            finally {
+                localUpdateLock.unlock();
             }
         }
-        finally {
-            distLock.unlock();
-        }
     }
 
-    /** Get local batch size for this sequences.
-     *
-     * @return Sequence batch size.
-     */
+    /** {@inheritDoc} */
     @Override public int batchSize() {
         return batchSize;
     }
 
-    /**
-     * Set local batch size for this sequences.
-     *
-     * @param size Sequence batch size. Must be more then 0.
-     */
+    /** {@inheritDoc} */
     @Override public void batchSize(int size) {
         A.ensure(size > 0, " Batch size can't be less then 0: " + size);
 
-        localUpdate.lock();
+        localUpdateLock.lock();
 
         try {
             batchSize = size;
         }
         finally {
-            localUpdate.unlock();
+            localUpdateLock.unlock();
         }
     }
 
     /** {@inheritDoc} */
     @Override public int reservePercentage() {
-        return 0;
+        return reservePercentage;
     }
 
     /** {@inheritDoc} */
     @Override public void reservePercentage(int percentage) {
+        A.ensure(percentage >= 0 && percentage <= 100, "Invalid reserve percentage: " + percentage);
 
+        localUpdateLock.lock();
+
+        try {
+            reservePercentage = percentage;
+        }
+        finally {
+            localUpdateLock.unlock();
+        }
     }
 
     /** {@inheritDoc} */
@@ -297,80 +326,64 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
     }
 
     /**
-     * Method returns callable for execution all update operations in async and sync mode.
+     * Runs async reservation of new range for current node.
      *
-     * @param l Value will be added to sequence.
-     * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value.
-     * @return Callable for execution in async and sync mode.
+     * @param off Offset.
+     * @return Future.
      */
-    @SuppressWarnings("TooBroadScope")
-    private Callable<Long> internalUpdate(final long l, final boolean updated) {
-        return new Callable<Long>() {
-            @Override public Long call() throws Exception {
-                assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();
-
-                try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicSequenceValue seq = cacheView.get(key);
-
-                    checkRemoved();
-
-                    assert seq != null;
+    private IgniteInternalFuture<?> runAsyncReservation(final long off) {
+        assert off >= 0 : off;
 
-                    long curLocVal;
+        GridFutureAdapter<?> resFut = new GridFutureAdapter<>();
 
-                    long newUpBound;
+        resFut.listen(f -> {
+            if (f.error() == null)
+                reservationFut = null;
+        });
 
-                    // Even though we hold a transaction lock here, we must hold the local update lock here as well
-                    // because we mutate multipe variables (locVal and upBound).
-                    localUpdate.lock();
+        ctx.kernalContext().closure().runLocalSafe(() -> {
+            Callable<Void> reserveCall = reserveCallable(off);
 
-                    try {
-                        curLocVal = locVal;
-
-                        // If local range was already reserved in another thread.
-                        if (curLocVal + l <= upBound) {
-                            locVal = curLocVal + l;
+            try {
+                CU.retryTopologySafe(reserveCall);
 
-                            return updated ? curLocVal + l : curLocVal;
-                        }
+                resFut.onDone();
+            }
+            catch (Throwable h) {
+                resFut.onDone(h);
+            }
+        }, poolPlc);
 
-                        long curGlobalVal = seq.get();
+        return resFut;
+    }
 
-                        long newLocVal;
+    /**
+     * @param off Reservation offset.
+     * @return Callable for reserved new interval.
+     */
+    private Callable<Void> reserveCallable(long off){
+       return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicSequenceValue seq = cacheView.get(key);
 
-                        /* We should use offset because we already reserved left side of range.*/
-                        long off = batchSize > 1 ? batchSize - 1 : 1;
+                    checkRemoved();
 
-                        // Calculate new values for local counter, global counter and upper bound.
-                        if (curLocVal + l >= curGlobalVal) {
-                            newLocVal = curLocVal + l;
+                    assert seq != null;
 
-                            newUpBound = newLocVal + off;
-                        }
-                        else {
-                            newLocVal = curGlobalVal;
+                    long curGlobalVal = seq.get();
 
-                            newUpBound = newLocVal + off;
-                        }
+                    reservedBottomBound = curGlobalVal + off;
 
-                        locVal = newLocVal;
-                        upBound = newUpBound;
+                    reservedUpBound = reservedBottomBound + batchSize;
 
-                        if (updated)
-                            curLocVal = newLocVal;
-                    }
-                    finally {
-                        localUpdate.unlock();
-                    }
+                    newReservationLine = calculateNewReservationLine(reservedBottomBound);
 
-                    // Global counter must be more than reserved upper bound.
-                    seq.set(newUpBound + 1);
+                    seq.set(reservedUpBound);
 
                     cacheView.put(key, seq);
 
                     tx.commit();
-
-                    return curLocVal;
                 }
                 catch (Error | Exception e) {
                     if(!X.hasCause(e, ClusterTopologyCheckedException.class))
@@ -378,10 +391,19 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
 
                     throw e;
                 }
+
+                return null;
             }
         };
     }
 
+    /**
+     * @return New reservation line.
+     */
+    private long calculateNewReservationLine(long initialValue) {
+        return initialValue + (batchSize * reservePercentage / 100);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx.kernalContext());