You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/20 07:49:41 UTC
[68/70] [abbrv] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-2893
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f7f61c6,163ed99..c9c4f34
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@@ -68,26 -68,26 +68,35 @@@ public class IgniteTxEntry implements G
private static final long serialVersionUID = 0L;
/** Dummy version for non-existing entry read in SERIALIZABLE transaction. */
- public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0, 0);
+ public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new GridCacheVersion(0, 0, 0);
/** Dummy version for any existing entry read in SERIALIZABLE transaction. */
- public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
+ public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 1);
/** */
- public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2);
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 2);
/** */
- public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3);
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3);
+
+ /** Skip store flag bit mask. */
+ private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+
+ /** Keep binary flag. */
+ private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+
+ /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+ private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+ /** Skip store flag bit mask. */
+ private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01;
+
+ /** Keep binary flag. */
+ private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02;
+
+ /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+ private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
+
/** Prepared flag updater. */
private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index f74d8a4,be718cf..3f07151
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@@ -32,10 -29,17 +32,11 @@@ import org.apache.ignite.cache.CacheEnt
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic long implementation.
*/
@@@ -363,81 -422,38 +364,92 @@@ public final class GridCacheAtomicLongI
}
}
+ /** {@inheritDoc} */
++ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
++ this.atomicView = kctx.cache().atomicsCache();
++ this.ctx = atomicView.context();
++ }
++
++ /** {@inheritDoc} */
++ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
++ // No-op.
++ }
++
++ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
/**
- * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+ * Reconstructs object on unmarshalling.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Callable<Long> internalAddAndGet(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ private Object readResolve() throws ObjectStreamException {
+ try {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
- long retVal = val.get() + l;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheAtomicLongImpl.class, this);
+ }
- val.set(retVal);
+ /**
+ *
+ */
+ static class GetAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ GetAndSetProcessor(long newVal) {
+ this.newVal = newVal;
+ }
- atomicView.put(key, val);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- tx.commit();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to add and get: " + this, e);
+ long curVal = val.get();
- throw e;
- }
- }
- });
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndSetProcessor.class, this);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index a026bf3,4365468..667fd15
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@@ -33,11 -29,18 +33,12 @@@ import org.apache.ignite.cache.CacheEnt
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic reference implementation.
*/
@@@ -220,6 -205,93 +221,17 @@@ public final class GridCacheAtomicRefer
}
}
- /**
- * Method returns callable for execution {@link #set(Object)} operation in async and sync mode.
- *
- * @param val Value will be set in reference .
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalSet(final T val) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- ref.set(val);
-
- atomicView.put(key, ref);
-
- tx.commit();
-
- return true;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
- /**
- * Conditionally sets the new value. It will be set if {@code expValPred} is
- * evaluate to {@code true}.
- *
- * @param expVal Expected value.
- * @param newVal New value.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
- return retryTopologySafe(new Callable<T>() {
- @Override public T call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- T origVal = ref.get();
-
- if (!F.eq(expVal, origVal)) {
- tx.setRollbackOnly();
-
- return origVal;
- }
- else {
- ref.set(newVal);
-
- atomicView.getAndPut(key, ref);
-
- tx.commit();
-
- return expVal;
- }
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" +
- newVal + ", atomicReference" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ this.atomicView = kctx.cache().atomicsCache();
+ this.ctx = atomicView.context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
++ // No-op.
+ }
+
/**
* Check removed status.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 1e4da30,09cea43..877b158
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@@ -33,10 -29,19 +33,11 @@@ import org.apache.ignite.cache.CacheEnt
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache atomic stamped implementation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/81835e48/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 6ebd655,0039fa2..903423d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@@ -346,13 -361,16 +358,15 @@@ public final class GridCacheSemaphoreIm
/**
* This method is used for releasing the permits acquired by failing node.
+ * In case the semaphore is broken, no permits are released and semaphore is set (globally) to broken state.
*
* @param nodeId ID of the failing node.
+ * @param broken Flag indicating that this semaphore is broken.
* @return True if this is the call that succeeded to change the global state.
*/
- boolean releaseFailedNode(final UUID nodeId) {
- protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
++ boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (
GridNearTxLocal tx = CU.txStartInternal(ctx,
@@@ -466,10 -506,16 +499,14 @@@
tx.commit();
- return new Sync(cnt, waiters, failoverSafe);
+ Sync sync = new Sync(cnt, waiters, failoverSafe);
+
+ sync.setBroken(val.isBroken());
+
+ return sync;
}
}
- }),
- ctx
- );
+ });
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
@@@ -717,12 -787,15 +775,15 @@@
try {
initializeSemaphore();
- boolean result = sync.nonfairTryAcquireShared(1) >= 0;
+ boolean res = sync.nonfairTryAcquireShared(1) >= 0;
- if (isBroken())
+ if (isBroken()) {
+ Thread.interrupted(); // Clear interrupt flag.
+
throw new InterruptedException();
+ }
- return result;
+ return res;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);