You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/08/29 12:17:46 UTC
[04/12] ignite git commit: IGNITE-1678 first implementation (fully
not working)
IGNITE-1678 first implementation (fully not working)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/839f43ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/839f43ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/839f43ad
Branch: refs/heads/ignite-1678
Commit: 839f43adf80a04e4a2778983560031592a20a89e
Parents: 4a1d0c2
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Sun Aug 26 14:45:49 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Sun Aug 26 14:45:49 2018 +0300
----------------------------------------------------------------------
.../configuration/AtomicConfiguration.java | 2 +-
.../GridCacheAtomicSequenceImpl.java | 270 ++++++++++---------
2 files changed, 147 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
index 8d0e0be..5dfd1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/AtomicConfiguration.java
@@ -38,7 +38,7 @@ public class AtomicConfiguration {
public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000;
/** Default atomic sequence reservation size. */
- public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 70;
+ public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_PERCENTAGE = 80;
/** Default batch size for all cache's sequences. */
private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/839f43ad/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 61e9262..c69d483 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -27,21 +27,20 @@ import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -50,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* Cache sequence implementation.
*/
public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<GridCacheAtomicSequenceValue>
- implements GridCacheAtomicSequenceEx, IgniteChangeGlobalStateSupport, Externalizable {
+ implements GridCacheAtomicSequenceEx, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -73,22 +72,25 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
private volatile int batchSize;
/** Reservation percentage. */
- private int reservePercentage;
+ private volatile int reservePercentage;
- /** Synchronization lock for local value updates. */
- private final Lock localUpdate = new ReentrantLock();
+ /** Reserved bottom bound of local counter (included). */
+ private volatile long reservedBottomBound;
+
+ /** Reserved upper bound of local counter (not included). */
+ private volatile long reservedUpBound;
- /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */
- private final ReentrantLock distUpdateFreeTop = new ReentrantLock();
+ /** A limit after which a new reservation should be done. */
+ private volatile long newReservationLine;
- /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */
- private final ReentrantLock distUpdateLockedTop = new ReentrantLock();
+ /** Reservation future. */
+ private volatile IgniteInternalFuture<?> reservationFut;
- /** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */
- private final Callable<Long> incAndGetCall = internalUpdate(1, true);
+ /** Reservation pool. */
+ private final byte poolPlc = GridIoPolicy.SYSTEM_POOL;
- /** Callable for execution {@link #getAndIncrement} operation in async and sync mode. */
- private final Callable<Long> getAndIncCall = internalUpdate(1, false);
+ /** Synchronization lock for local value updates. */
+ private final Lock localUpdateLock = new ReentrantLock();
/**
* Empty constructor required by {@link Externalizable}.
@@ -125,6 +127,11 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
this.reservePercentage = reservePercentage;
this.upBound = upBound;
this.locVal = locVal;
+
+ reservedBottomBound = locVal;
+ reservedUpBound = upBound;
+ // Calculate next reservation bound.
+ newReservationLine = calculateNewReservationLine(locVal);
}
/** {@inheritDoc} */
@@ -137,7 +144,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
/** {@inheritDoc} */
@Override public long incrementAndGet() {
try {
- return internalUpdate(1, incAndGetCall, true);
+ return internalUpdate(1, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -147,7 +154,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
/** {@inheritDoc} */
@Override public long getAndIncrement() {
try {
- return internalUpdate(1, getAndIncCall, false);
+ return internalUpdate(1, false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -159,7 +166,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
try {
- return internalUpdate(l, null, true);
+ return internalUpdate(l, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -171,7 +178,7 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
try {
- return internalUpdate(l, null, false);
+ return internalUpdate(l, false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -182,99 +189,121 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
* Synchronous sequence update operation. Will add given amount to the sequence value.
*
* @param l Increment amount.
- * @param updateCall Cache call that will update sequence reservation count in accordance with l.
* @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
* prior to update.
* @return Sequence value.
* @throws IgniteCheckedException If update failed.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
- private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
- checkRemoved();
-
+ private long internalUpdate(long l, boolean updated) throws IgniteCheckedException {
assert l > 0;
- localUpdate.lock();
+ while (true){
+ checkRemoved();
- try {
- // If reserved range isn't exhausted.
- long locVal0 = locVal;
+ localUpdateLock.lock();
- if (locVal0 + l <= upBound) {
- locVal = locVal0 + l;
+ IgniteInternalFuture<?> reservation = reservationFut;
- return updated ? locVal0 + l : locVal0;
- }
- }
- finally {
- localUpdate.unlock();
- }
+ try {
+ boolean reservationInProgress = reservation != null;
- AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null);
+ long newLocalVal = locVal + l;
- // We need two separate locks here because two independent thread may attempt to update the sequence
- // simultaneously, one thread with locked topology and other with unlocked.
- // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread
- // waits for topology change, and locked topology thread waits to acquire the lock.
- // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time
- // we do not want multiple threads to attempt to run identical cache updates.
- ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop;
+ // Reserve new interval if operation is not in progress.
+ if (newLocalVal >= newReservationLine && newLocalVal <= reservedUpBound && !reservationInProgress){
+ reservationFut = runAsyncReservation(0);
+ }
- distLock.lock();
+ long locVal0 = locVal;
- try {
- if (updateCall == null)
- updateCall = internalUpdate(l, updated);
+ if (newLocalVal <= upBound) {
+ locVal = newLocalVal;
- try {
- return CU.retryTopologySafe(updateCall);
- }
- catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
- throw e;
+ return updated ? newLocalVal : locVal0;
+ }
+
+ // Await complete previous reservation.
+ if (reservationInProgress){
+ reservation.get();
+
+ reservationFut = null;
+
+ // Retry check bounds.
+ continue;
+ }
+
+ // Still in reserved interval.
+ if (newLocalVal < reservedUpBound) {
+ long curVal = locVal;
+
+ if (newLocalVal < reservedBottomBound)
+ locVal = reservedBottomBound;
+ else
+ locVal += l;
+
+ upBound = reservedUpBound;
+
+ return updated ? locVal : curVal;
+ }
+ // Switched to the next interval. New value more that upper reserved bound.
+ else if (!reservationInProgress) {
+ long diff = newLocalVal - reservedUpBound;
+
+ // Calculate how many batch size included in l.
+ // It will our offset for global seq counter.
+ long off = (diff / batchSize) * batchSize;
+
+ reservationFut = runAsyncReservation(off);
+
+ // Can not wait async, should wait under lock until new interval reserved.
+ reservationFut.get();
+
+ reservationFut = null;
+ }
}
- catch (Exception e) {
- throw new IgniteCheckedException(e);
+ finally {
+ localUpdateLock.unlock();
}
}
- finally {
- distLock.unlock();
- }
}
- /** Get local batch size for this sequences.
- *
- * @return Sequence batch size.
- */
+ /** {@inheritDoc} */
@Override public int batchSize() {
return batchSize;
}
- /**
- * Set local batch size for this sequences.
- *
- * @param size Sequence batch size. Must be more then 0.
- */
+ /** {@inheritDoc} */
@Override public void batchSize(int size) {
A.ensure(size > 0, " Batch size can't be less then 0: " + size);
- localUpdate.lock();
+ localUpdateLock.lock();
try {
batchSize = size;
}
finally {
- localUpdate.unlock();
+ localUpdateLock.unlock();
}
}
/** {@inheritDoc} */
@Override public int reservePercentage() {
- return 0;
+ return reservePercentage;
}
/** {@inheritDoc} */
@Override public void reservePercentage(int percentage) {
+ A.ensure(percentage >= 0 && percentage <= 100, "Invalid reserve percentage: " + percentage);
+ localUpdateLock.lock();
+
+ try {
+ reservePercentage = percentage;
+ }
+ finally {
+ localUpdateLock.unlock();
+ }
}
/** {@inheritDoc} */
@@ -297,80 +326,64 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
}
/**
- * Method returns callable for execution all update operations in async and sync mode.
+ * Runs async reservation of new range for current node.
*
- * @param l Value will be added to sequence.
- * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value.
- * @return Callable for execution in async and sync mode.
+ * @param off Offset.
+ * @return Future.
*/
- @SuppressWarnings("TooBroadScope")
- private Callable<Long> internalUpdate(final long l, final boolean updated) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread();
-
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicSequenceValue seq = cacheView.get(key);
-
- checkRemoved();
-
- assert seq != null;
+ private IgniteInternalFuture<?> runAsyncReservation(final long off) {
+ assert off >= 0 : off;
- long curLocVal;
+ GridFutureAdapter<?> resFut = new GridFutureAdapter<>();
- long newUpBound;
+ resFut.listen(f -> {
+ if (f.error() == null)
+ reservationFut = null;
+ });
- // Even though we hold a transaction lock here, we must hold the local update lock here as well
- // because we mutate multipe variables (locVal and upBound).
- localUpdate.lock();
+ ctx.kernalContext().closure().runLocalSafe(() -> {
+ Callable<Void> reserveCall = reserveCallable(off);
- try {
- curLocVal = locVal;
-
- // If local range was already reserved in another thread.
- if (curLocVal + l <= upBound) {
- locVal = curLocVal + l;
+ try {
+ CU.retryTopologySafe(reserveCall);
- return updated ? curLocVal + l : curLocVal;
- }
+ resFut.onDone();
+ }
+ catch (Throwable h) {
+ resFut.onDone(h);
+ }
+ }, poolPlc);
- long curGlobalVal = seq.get();
+ return resFut;
+ }
- long newLocVal;
+ /**
+ * @param off Reservation offset.
+ * @return Callable for reserved new interval.
+ */
+ private Callable<Void> reserveCallable(long off){
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheAtomicSequenceValue seq = cacheView.get(key);
- /* We should use offset because we already reserved left side of range.*/
- long off = batchSize > 1 ? batchSize - 1 : 1;
+ checkRemoved();
- // Calculate new values for local counter, global counter and upper bound.
- if (curLocVal + l >= curGlobalVal) {
- newLocVal = curLocVal + l;
+ assert seq != null;
- newUpBound = newLocVal + off;
- }
- else {
- newLocVal = curGlobalVal;
+ long curGlobalVal = seq.get();
- newUpBound = newLocVal + off;
- }
+ reservedBottomBound = curGlobalVal + off;
- locVal = newLocVal;
- upBound = newUpBound;
+ reservedUpBound = reservedBottomBound + batchSize;
- if (updated)
- curLocVal = newLocVal;
- }
- finally {
- localUpdate.unlock();
- }
+ newReservationLine = calculateNewReservationLine(reservedBottomBound);
- // Global counter must be more than reserved upper bound.
- seq.set(newUpBound + 1);
+ seq.set(reservedUpBound);
cacheView.put(key, seq);
tx.commit();
-
- return curLocVal;
}
catch (Error | Exception e) {
if(!X.hasCause(e, ClusterTopologyCheckedException.class))
@@ -378,10 +391,19 @@ public final class GridCacheAtomicSequenceImpl extends AtomicDataStructureProxy<
throw e;
}
+
+ return null;
}
};
}
+ /**
+ * @return New reservation line.
+ */
+ private long calculateNewReservationLine(long initialValue) {
+ return initialValue + (batchSize * reservePercentage / 100);
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());