You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/04 15:45:11 UTC
[02/50] [abbrv] ignite git commit: Fixed conflict resolver API.
Fixed conflict resolver API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e61602e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e61602e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e61602e
Branch: refs/heads/sql-store-cmp
Commit: 4e61602eca679bf3689bb23f2bc1c9e58b4eb8dc
Parents: 1945b98
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jan 25 19:16:47 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Feb 1 07:43:59 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheOperationContext.java | 43 +++++--
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheProxyImpl.java | 11 +-
.../processors/cache/IgniteCacheProxy.java | 43 ++++++-
.../dht/atomic/GridDhtAtomicCache.java | 104 +++++++++++++----
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../processors/cache/dr/GridCacheDrInfo.java | 49 +++++++-
.../transactions/IgniteTxLocalAdapter.java | 81 +++++++++-----
.../cache/version/GridCacheVersionManager.java | 23 ++--
.../testframework/junits/GridAbstractTest.java | 3 +
parent/pom.xml | 111 +++++++++++--------
pom.xml | 16 ---
12 files changed, 351 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 21934d0..f39a09d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -48,6 +48,9 @@ public class CacheOperationContext implements Serializable {
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
+ /** Data center Id. */
+ private final Byte dataCenterId;
+
/**
* Constructor with default values.
*/
@@ -61,6 +64,8 @@ public class CacheOperationContext implements Serializable {
expiryPlc = null;
noRetries = false;
+
+ dataCenterId = null;
}
/**
@@ -68,13 +73,15 @@ public class CacheOperationContext implements Serializable {
* @param subjId Subject ID.
* @param keepBinary Keep binary flag.
* @param expiryPlc Expiry policy.
+ * @param dataCenterId Data center id.
*/
public CacheOperationContext(
boolean skipStore,
@Nullable UUID subjId,
boolean keepBinary,
@Nullable ExpiryPolicy expiryPlc,
- boolean noRetries) {
+ boolean noRetries,
+ @Nullable Byte dataCenterId) {
this.skipStore = skipStore;
this.subjId = subjId;
@@ -84,6 +91,8 @@ public class CacheOperationContext implements Serializable {
this.expiryPlc = expiryPlc;
this.noRetries = noRetries;
+
+ this.dataCenterId = dataCenterId;
}
/**
@@ -94,6 +103,13 @@ public class CacheOperationContext implements Serializable {
}
/**
+ * @return {@code True} if data center id is set otherwise {@code false}.
+ */
+ public boolean hasDataCenterId() {
+ return dataCenterId != null;
+ }
+
+ /**
* See {@link IgniteInternalCache#keepBinary()}.
*
* @return New instance of CacheOperationContext with keep binary flag.
@@ -104,7 +120,8 @@ public class CacheOperationContext implements Serializable {
subjId,
true,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -117,6 +134,15 @@ public class CacheOperationContext implements Serializable {
}
/**
+ * Gets data center ID.
+ *
+ * @return Client ID.
+ */
+ @Nullable public Byte dataCenterId() {
+ return dataCenterId;
+ }
+
+ /**
* See {@link IgniteInternalCache#forSubjectId(UUID)}.
*
* @param subjId Subject id.
@@ -128,7 +154,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -150,7 +177,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -172,7 +200,8 @@ public class CacheOperationContext implements Serializable {
subjId,
true,
plc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -185,8 +214,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries
- );
+ noRetries,
+ dataCenterId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3081cfb..9fd65e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -447,7 +447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
- CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
@@ -459,14 +459,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
- CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
@Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
- CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
@@ -483,7 +483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert !CU.isAtomicsCache(ctx.name());
assert !CU.isMarshallerCache(ctx.name());
- CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 8ffd273..3a53942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false));
+ opCtx != null ? opCtx.forSubjectId(subjId) :
+ new CacheOperationContext(false, subjId, false, null, false, null));
}
/** {@inheritDoc} */
@@ -221,7 +222,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return this;
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false));
+ opCtx != null ? opCtx.setSkipStore(skipStore) :
+ new CacheOperationContext(true, null, false, null, false, null));
}
finally {
gate.leave(prev);
@@ -236,7 +238,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
- opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false));
+ opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false, null));
}
/** {@inheritDoc} */
@@ -1608,7 +1610,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
try {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false));
+ opCtx != null ? opCtx.withExpiryPolicy(plc) :
+ new CacheOperationContext(false, null, false, plc, false, null));
}
finally {
gate.leave(prev);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b64c69c..9e66d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -307,7 +307,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
- new CacheOperationContext(false, null, false, plc, false);
+ new CacheOperationContext(false, null, false, plc, false, null);
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
@@ -339,7 +339,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return this;
CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) :
- new CacheOperationContext(false, null, false, null, true);
+ new CacheOperationContext(false, null, false, null, true, null);
return new IgniteCacheProxy<>(ctx,
delegate,
@@ -1788,7 +1788,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
opCtx != null ? opCtx.subjectId() : null,
true,
opCtx != null ? opCtx.expiry() : null,
- opCtx != null && opCtx.noRetries());
+ opCtx != null && opCtx.noRetries(),
+ opCtx != null ? opCtx.dataCenterId() : null);
return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
@@ -1802,6 +1803,39 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
+ * @return Projection for data center id.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ Byte prevDataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+ if (prevDataCenterId != null && dataCenterId == prevDataCenterId)
+ return this;
+
+ CacheOperationContext opCtx0 =
+ new CacheOperationContext(
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null ? opCtx.subjectId() : null,
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null ? opCtx.expiry() : null,
+ opCtx != null && opCtx.noRetries(),
+ dataCenterId);
+
+ return new IgniteCacheProxy<>(ctx,
+ delegate,
+ opCtx0,
+ isAsync(),
+ lock);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
+ /**
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
@@ -1820,7 +1854,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
opCtx != null ? opCtx.subjectId() : null,
opCtx != null && opCtx.isKeepBinary(),
opCtx != null ? opCtx.expiry() : null,
- opCtx != null && opCtx.noRetries());
+ opCtx != null && opCtx.noRetries(),
+ opCtx != null ? opCtx.dataCenterId() : null);
return new IgniteCacheProxy<>(ctx,
delegate,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aa79cfa..6b23550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -448,7 +448,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
false,
filter,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -464,7 +465,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
filter,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -479,7 +481,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
false,
ctx.noValArray(),
- false).get();
+ false,
+ UPDATE).get();
}
/** {@inheritDoc} */
@@ -571,7 +574,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
true,
ctx.equalsValArray(oldVal),
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -589,7 +593,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
CU.empty0(),
- true).chain(RET2NULL);
+ true,
+ UPDATE).chain(RET2NULL);
}
/** {@inheritDoc} */
@@ -610,7 +615,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -790,7 +796,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
@@ -846,7 +853,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
@Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException {
@@ -882,7 +890,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
}
/**
@@ -901,15 +910,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("ConstantConditions")
private IgniteInternalFuture updateAllAsync0(
- @Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
@Nullable Object[] invokeArgs,
- @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
- @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
final boolean retval,
final boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter,
- final boolean waitTopFut
+ final boolean waitTopFut,
+ final GridCacheOperation op
) {
assert ctx.updatesAllowed();
@@ -918,7 +928,47 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_PUT);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert conflictPutMap == null : conflictPutMap;
+ assert conflictRmvMap == null : conflictRmvMap;
+
+ if (op == GridCacheOperation.TRANSFORM) {
+ assert invokeMap != null : invokeMap;
+
+ conflictPutMap = F.viewReadOnly((Map)invokeMap,
+ new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(EntryProcessor o) {
+ return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
+
+ invokeMap = null;
+ }
+ else if (op == GridCacheOperation.DELETE) {
+ assert map != null : map;
+
+ conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(V o) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
+
+ map = null;
+ }
+ else {
+ assert map != null : map;
+
+ conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(V o) {
+ return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
+
+ map = null;
+ }
+ }
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
@@ -928,7 +978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
- invokeMap != null ? TRANSFORM : UPDATE,
+ op,
map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
conflictPutMap.keySet() : conflictRmvMap.keySet(),
map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
@@ -966,8 +1016,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Completion future.
*/
private IgniteInternalFuture removeAllAsync0(
- @Nullable final Collection<? extends K> keys,
- @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap,
+ @Nullable Collection<? extends K> keys,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap,
final boolean retval,
boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter
@@ -985,12 +1035,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ Collection<GridCacheVersion> drVers = null;
+
+ if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
+ assert conflictMap == null : conflictMap;
+
+ drVers = F.transform(keys, new C1<K, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(K k) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
+ }
+
final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
ctx,
this,
@@ -1000,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
null,
null,
- keys != null ? null : conflictMap.values(),
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
retval,
rawRetval,
(filter != null && opCtx != null) ? opCtx.expiry() : null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3c86083..c9e1a11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1034,7 +1034,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else if (conflictPutVals != null) {
GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
@@ -1142,7 +1142,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Conflict PUT.
GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8635fe2..02bc6b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable {
/** Value. */
private CacheObject val;
+ /** Entry processor. */
+ private EntryProcessor proc;
+
/** DR version. */
private GridCacheVersion ver;
@@ -61,6 +65,29 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * Constructor.
+ *
+ * @param ver Version.
+ */
+ public GridCacheDrInfo(GridCacheVersion ver) {
+ this.ver = ver;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param proc Entry processor.
+ * @param ver Version.
+ */
+ public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
+ assert proc != null;
+ assert ver != null;
+
+ this.proc = proc;
+ this.ver = ver;
+ }
+
+ /**
* @return Value.
*/
public CacheObject value() {
@@ -68,6 +95,20 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * @return Entry processor.
+ */
+ public EntryProcessor entryProcessor() {
+ return proc;
+ }
+
+ /**
+ * @return Value (entry processor or cache object.
+ */
+ public Object valueEx() {
+ return val == null ? proc : val;
+ }
+
+ /**
* @return Version.
*/
public GridCacheVersion version() {
@@ -88,13 +129,13 @@ public class GridCacheDrInfo implements Externalizable {
return CU.EXPIRE_TIME_ETERNAL;
}
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
assert false;
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
assert false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 926eaf2..aad9841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1977,8 +1977,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridCacheContext cacheCtx,
Map<KeyCacheObject, GridCacheDrInfo> drMap
) {
+ Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
return this.<Object, Object>putAllAsync0(cacheCtx,
- null,
+ map,
null,
null,
drMap,
@@ -2055,7 +2061,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final GridCacheReturn ret,
boolean skipStore,
final boolean singleRmv,
- boolean keepBinary) {
+ boolean keepBinary,
+ Byte dataCenterId) {
try {
addActiveCache(cacheCtx);
@@ -2066,6 +2073,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (entryProcessor != null)
transform = true;
+ GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
boolean loadMissed = enlistWriteEntry(cacheCtx,
cacheKey,
val,
@@ -2075,7 +2084,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
retval,
lockOnly,
filter,
- /*drVer*/null,
+ /*drVer*/drVer,
/*drTtl*/-1L,
/*drExpireTime*/-1L,
ret,
@@ -2125,6 +2134,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @param drRmvMap DR remove map (optional).
* @param skipStore Skip store flag.
* @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @param keepBinary Keep binary flag.
+ * @param dataCenterId Optional data center ID.
* @return Future for missing values loading.
*/
private <K, V> IgniteInternalFuture<Void> enlistWrite(
@@ -2143,7 +2154,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
boolean skipStore,
final boolean singleRmv,
- final boolean keepBinary
+ final boolean keepBinary,
+ Byte dataCenterId
) {
assert retval || invokeMap == null;
@@ -2197,6 +2209,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
drTtl = -1L;
drExpireTime = -1L;
}
+ else if (dataCenterId != null) {
+ drVer = cctx.versions().next(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
else {
drVer = null;
drTtl = -1L;
@@ -2938,6 +2955,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
@@ -2955,7 +2974,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
ret,
opCtx != null && opCtx.skipStore(),
/*singleRmv*/false,
- keepBinary);
+ keepBinary,
+ dataCenterId);
if (pessimistic()) {
assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3053,7 +3073,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@Nullable Map<? extends K, ? extends V> map,
@Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
@Nullable final Object[] invokeArgs,
- @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
final boolean retval,
@Nullable final CacheEntryPredicate[] filter
) {
@@ -3066,25 +3086,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return new GridFinishedFuture(e);
}
- // Cached entry may be passed only from entry wrapper.
- final Map<?, ?> map0;
- final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- if (drMap != null) {
- assert map == null;
+ final Byte dataCenterId;
- map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
- @Override public Object apply(GridCacheDrInfo val) {
- return val.value();
- }
- });
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
- invokeMap0 = null;
- }
- else {
- map0 = map;
- invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+ dataCenterId = opCtx.dataCenterId();
}
+ else
+ dataCenterId = null;
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
if (log.isDebugEnabled())
log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
@@ -3110,8 +3127,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
final IgniteInternalFuture<Void> loadFut = enlistWrite(
@@ -3130,7 +3145,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
opCtx != null && opCtx.skipStore(),
false,
- keepBinary);
+ keepBinary,
+ dataCenterId);
if (pessimistic()) {
assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3334,6 +3350,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
else
keys0 = keys;
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
assert keys0 != null;
if (log.isDebugEnabled()) {
@@ -3367,8 +3395,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final Collection<KeyCacheObject> enlisted = new ArrayList<>();
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
ExpiryPolicy plc;
if (!F.isEmpty(filter))
@@ -3394,7 +3420,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
drMap,
opCtx != null && opCtx.skipStore(),
singleRmv,
- keepBinary
+ keepBinary,
+ dataCenterId
);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 68d03cd..166c713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, false);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
+ }
+
+ /**
+ * @param dataCenterId Data center id.
+ * @return Next version based on current topology with given data center id.
+ */
+ public GridCacheVersion next(byte dataCenterId) {
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
}
/**
@@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given topology version.
*/
public GridCacheVersion next(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, false);
+ return next(topVer.topologyVersion(), true, false, dataCenterId);
}
/**
@@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, true);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId);
}
/**
@@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, true);
+ return next(topVer.topologyVersion(), true, true, dataCenterId);
}
/**
@@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, true);
+ return next(ver.topologyVersion(), false, true, dataCenterId);
}
/**
@@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, false);
+ return next(ver.topologyVersion(), false, false, dataCenterId);
}
/**
@@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version for which new version should be obtained.
* @param addTime If {@code true} then adds to the given topology version number of seconds
* from the start time of the first grid node.
+ * @param dataCenterId Data center id.
* @return New lock order.
*/
- private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) {
+ private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) {
if (topVer == -1)
topVer = cctx.kernalContext().discovery().topologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 99d1a42..8bf877a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1026,6 +1026,9 @@ public abstract class GridAbstractTest extends TestCase {
protected IgniteConfiguration loadConfiguration(String springCfgPath) throws IgniteCheckedException {
URL cfgLocation = U.resolveIgniteUrl(springCfgPath);
+ if (cfgLocation == null)
+ cfgLocation = U.resolveIgniteUrl(springCfgPath, false);
+
assert cfgLocation != null;
ApplicationContext springCtx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 21d8c69..c0f49c8 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -214,13 +214,6 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-apache-license-gen</artifactId>
- <version>${project.version}</version>
- <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
- </dependency>
</dependencies>
<build>
@@ -715,48 +708,6 @@
</execution>
</executions>
</plugin>
-
- <plugin><!-- generates dependencies licenses -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-remote-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>ignite-dependencies</id>
- <goals>
- <goal>process</goal>
- </goals>
- <configuration>
- <resourceBundles>
- <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}</resourceBundle>
- </resourceBundles>
- <excludeTransitive>true</excludeTransitive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <id>licenses-file-rename</id>
- <goals>
- <goal>run</goal>
- </goals>
- <phase>compile</phase>
- <configuration>
- <target>
- <!-- moving licenses generated by "ignite-dependencies" -->
- <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
- </target>
- <failOnError>false</failOnError>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
</plugins>
</build>
@@ -998,5 +949,67 @@
</dependency>
</dependencies>
</profile>
+
+ <profile>
+ <id>release</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-apache-license-gen</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin><!-- generates dependencies licenses -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>ignite-dependencies</id>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ <configuration>
+ <resourceBundles>
+ <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}
+ </resourceBundle>
+ </resourceBundles>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>licenses-file-rename</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>compile</phase>
+ <configuration>
+ <target>
+ <!-- moving licenses generated by "ignite-dependencies" -->
+ <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
+ </target>
+ <failOnError>false</failOnError>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f797e8..bead3ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -918,22 +918,6 @@
</execution>
</executions>
</plugin>
-
- <plugin><!-- skipping generates dependencies licenses -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-remote-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>ignite-dependencies</id>
- <goals>
- <goal>process</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
</project>