You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 10:18:13 UTC
[02/50] [abbrv] ignite git commit: Merge branch ignite-1.5 into
ignite-1282
Merge branch ignite-1.5 into ignite-1282
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3d4ce809
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3d4ce809
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3d4ce809
Branch: refs/heads/master
Commit: 3d4ce809fc96d93936a69a6076e7141da41d739c
Parents: c505f48 900788b
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Nov 20 14:03:43 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 20 14:03:43 2015 +0300
----------------------------------------------------------------------
modules/camel/README.txt | 34 +
modules/camel/licenses/apache-2.0.txt | 202 ++
modules/camel/pom.xml | 102 +
.../ignite/stream/camel/CamelStreamer.java | 237 ++
.../stream/camel/IgniteCamelStreamerTest.java | 420 ++++
.../camel/IgniteCamelStreamerTestSuite.java | 48 +
.../src/test/resources/camel.test.properties | 18 +
.../ignite/codegen/MessageCodeGenerator.java | 1 +
.../java/org/apache/ignite/IgniteCache.java | 3 +-
.../java/org/apache/ignite/IgniteCompute.java | 3 +-
.../org/apache/ignite/compute/ComputeJob.java | 2 +-
.../internal/GridEventConsumeHandler.java | 22 +-
.../internal/GridMessageListenHandler.java | 18 +
.../ignite/internal/GridUpdateNotifier.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 9 +-
.../communication/GridIoMessageFactory.java | 26 +-
.../discovery/GridDiscoveryManager.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 151 +-
.../processors/cache/GridCacheAtomicFuture.java | 6 +
.../cache/GridCacheDeploymentManager.java | 2 +-
.../processors/cache/GridCacheEntryEx.java | 12 +-
.../processors/cache/GridCacheFuture.java | 13 -
.../processors/cache/GridCacheGateway.java | 1 -
.../processors/cache/GridCacheIoManager.java | 50 +-
.../processors/cache/GridCacheMapEntry.java | 158 +-
.../processors/cache/GridCacheMessage.java | 20 +-
.../processors/cache/GridCacheMvcc.java | 7 -
.../processors/cache/GridCacheMvccFuture.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 150 +-
.../GridCachePartitionExchangeManager.java | 59 +-
.../cache/GridCacheSharedContext.java | 38 +-
.../cache/GridCacheUpdateAtomicResult.java | 15 +-
.../cache/GridCacheUpdateTxResult.java | 24 +-
.../processors/cache/IgniteCacheProxy.java | 3 +
.../distributed/GridCacheTxRecoveryFuture.java | 54 +-
.../distributed/GridDistributedBaseMessage.java | 56 -
.../distributed/GridDistributedLockRequest.java | 6 -
.../GridDistributedLockResponse.java | 32 +-
.../distributed/GridDistributedTxMapping.java | 78 -
.../GridDistributedTxPrepareRequest.java | 67 +-
.../GridDistributedTxRemoteAdapter.java | 158 +-
.../dht/CacheDistributedGetFutureAdapter.java | 27 +-
.../cache/distributed/dht/CacheGetFuture.java | 32 +
.../dht/GridClientPartitionTopology.java | 38 +-
.../distributed/dht/GridDhtCacheAdapter.java | 141 ++
.../distributed/dht/GridDhtLocalPartition.java | 35 +
.../distributed/dht/GridDhtLockFuture.java | 79 +-
.../distributed/dht/GridDhtLockRequest.java | 2 +-
.../dht/GridDhtPartitionTopology.java | 26 +-
.../dht/GridDhtPartitionTopologyImpl.java | 112 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 14 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 38 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 112 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 28 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 89 +-
.../cache/distributed/dht/GridDhtTxMapping.java | 134 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 136 +-
.../dht/GridDhtTxPrepareRequest.java | 54 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 29 +-
.../dht/GridPartitionedGetFuture.java | 69 +-
.../dht/GridPartitionedSingleGetFuture.java | 697 ++++++
.../dht/atomic/GridDhtAtomicCache.java | 206 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 159 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 121 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 5 -
.../dht/colocated/GridDhtColocatedCache.java | 162 +-
.../colocated/GridDhtColocatedLockFuture.java | 81 +-
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../preloader/GridDhtPartitionsFullMessage.java | 64 +-
.../GridDhtPartitionsSingleMessage.java | 56 +-
.../distributed/near/CacheVersionedValue.java | 2 +-
.../distributed/near/GridNearAtomicCache.java | 10 +-
.../distributed/near/GridNearCacheAdapter.java | 4 +-
.../distributed/near/GridNearGetFuture.java | 49 +-
.../distributed/near/GridNearGetRequest.java | 1 -
.../distributed/near/GridNearGetResponse.java | 2 -
.../distributed/near/GridNearLockFuture.java | 72 +-
.../distributed/near/GridNearLockRequest.java | 4 +-
...arOptimisticSerializableTxPrepareFuture.java | 124 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 170 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 72 +-
.../GridNearPessimisticTxPrepareFuture.java | 59 +-
.../near/GridNearSingleGetRequest.java | 396 ++++
.../near/GridNearSingleGetResponse.java | 321 +++
.../near/GridNearTransactionalCache.java | 10 +-
.../near/GridNearTxFinishFuture.java | 103 +-
.../cache/distributed/near/GridNearTxLocal.java | 273 ++-
.../near/GridNearTxPrepareFutureAdapter.java | 20 +-
.../near/GridNearTxPrepareRequest.java | 61 +-
.../distributed/near/GridNearTxRemote.java | 33 +-
.../distributed/near/IgniteTxMappings.java | 75 +
.../distributed/near/IgniteTxMappingsImpl.java | 92 +
.../near/IgniteTxMappingsSingleImpl.java | 101 +
.../processors/cache/local/GridLocalCache.java | 4 +-
.../cache/local/GridLocalLockFuture.java | 5 -
.../CacheContinuousQueryBatchAck.java | 163 ++
.../continuous/CacheContinuousQueryEntry.java | 196 +-
.../continuous/CacheContinuousQueryHandler.java | 811 ++++++-
.../CacheContinuousQueryListener.java | 35 +
.../continuous/CacheContinuousQueryManager.java | 151 +-
.../cache/transactions/IgniteInternalTx.java | 13 +-
.../cache/transactions/IgniteTxAdapter.java | 68 +-
.../cache/transactions/IgniteTxEntry.java | 29 +-
.../cache/transactions/IgniteTxHandler.java | 38 +-
.../IgniteTxImplicitSingleStateImpl.java | 266 +++
.../transactions/IgniteTxLocalAdapter.java | 1424 ++++++-----
.../cache/transactions/IgniteTxLocalEx.java | 30 +-
.../cache/transactions/IgniteTxLocalState.java | 44 +
.../transactions/IgniteTxLocalStateAdapter.java | 41 +
.../cache/transactions/IgniteTxManager.java | 21 +-
.../cache/transactions/IgniteTxMap.java | 3 +-
.../cache/transactions/IgniteTxRemoteEx.java | 18 +-
.../IgniteTxRemoteSingleStateImpl.java | 108 +
.../cache/transactions/IgniteTxRemoteState.java | 34 +
.../IgniteTxRemoteStateAdapter.java | 115 +
.../transactions/IgniteTxRemoteStateImpl.java | 124 +
.../cache/transactions/IgniteTxState.java | 177 ++
.../cache/transactions/IgniteTxStateImpl.java | 414 ++++
.../clock/GridClockSyncProcessor.java | 28 +-
.../continuous/GridContinuousBatch.java | 44 +
.../continuous/GridContinuousBatchAdapter.java | 46 +
.../continuous/GridContinuousHandler.java | 22 +
.../continuous/GridContinuousProcessor.java | 221 +-
.../StartRoutineAckDiscoveryMessage.java | 14 +-
.../StartRoutineDiscoveryMessage.java | 21 +-
.../internal/util/UUIDCollectionMessage.java | 114 +
.../util/future/GridCompoundFuture.java | 15 +-
.../ignite/internal/util/lang/GridFunc.java | 8 +-
.../ignite/internal/util/nio/GridNioServer.java | 13 +-
.../ignite/marshaller/MarshallerExclusions.java | 4 +-
.../org/apache/ignite/mxbean/IgniteMXBean.java | 8 +-
.../org/apache/ignite/stream/StreamAdapter.java | 18 +-
.../IgniteClientReconnectCacheTest.java | 11 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 75 +
.../GridCacheConcurrentTxMultiNodeTest.java | 15 -
.../cache/GridCachePartitionedGetSelfTest.java | 3 +-
.../processors/cache/GridCacheTestEntryEx.java | 10 +-
.../IgniteCacheAbstractStopBusySelfTest.java | 27 +-
.../IgniteCacheP2pUnmarshallingErrorTest.java | 184 +-
.../CacheGetFutureHangsSelfTest.java | 6 +
.../GridCacheAbstractNodeRestartSelfTest.java | 2 +
.../IgniteCacheSingleGetMessageTest.java | 357 +++
.../GridCacheReplicatedMetricsSelfTest.java | 9 -
.../IgniteCacheTxStoreSessionTest.java | 2 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2235 ++++++++++++++++++
...ryFailoverAtomicNearEnabledSelfSelfTest.java | 46 +
...FailoverAtomicPrimaryWriteOrderSelfTest.java | 44 +
...usQueryFailoverAtomicReplicatedSelfTest.java | 40 +
...inuousQueryFailoverTxReplicatedSelfTest.java | 32 +
.../CacheContinuousQueryFailoverTxSelfTest.java | 39 +
...ridCacheContinuousQueryAbstractSelfTest.java | 153 +-
.../GridCacheContinuousQueryTxSelfTest.java | 49 +
...CacheContinuousQueryClientReconnectTest.java | 187 ++
.../IgniteCacheContinuousQueryClientTest.java | 157 +-
...cheContinuousQueryClientTxReconnectTest.java | 32 +
.../p2p/GridP2PSameClassLoaderSelfTest.java | 16 +-
.../testframework/junits/GridAbstractTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 3 +
.../testsuites/IgniteCacheTestSuite3.java | 2 +
.../testsuites/IgniteCacheTestSuite4.java | 3 +
.../ignite/util/mbeans/GridMBeanSelfTest.java | 33 +-
modules/flume/README.txt | 72 +
modules/flume/licenses/apache-2.0.txt | 202 ++
modules/flume/pom.xml | 77 +
.../ignite/stream/flume/EventTransformer.java | 36 +
.../apache/ignite/stream/flume/IgniteSink.java | 186 ++
.../stream/flume/IgniteSinkConstants.java | 35 +
.../ignite/stream/flume/IgniteSinkTest.java | 142 ++
.../stream/flume/IgniteSinkTestSuite.java | 37 +
.../stream/flume/TestEventTransformer.java | 66 +
.../flume/src/test/resources/example-ignite.xml | 71 +
.../IgniteCacheQuerySelfTestSuite.java | 16 +-
.../GridSpringResourceInjectionSelfTest.java | 143 ++
.../processors/resource/spring-resource.xml | 27 +
.../testsuites/IgniteResourceSelfTestSuite.java | 2 +
modules/twitter/README.txt | 32 +
modules/twitter/licenses/apache-2.0.txt | 202 ++
modules/twitter/pom.xml | 122 +
.../ignite/stream/twitter/OAuthSettings.java | 86 +
.../ignite/stream/twitter/TwitterStreamer.java | 295 +++
.../twitter/IgniteTwitterStreamerTest.java | 234 ++
.../twitter/IgniteTwitterStreamerTestSuite.java | 32 +
.../stream/twitter/TwitterStreamerImpl.java | 79 +
.../config/benchmark-multicast.properties | 6 +-
.../benchmark-query-put-separated.properties | 87 +
.../yardstick/cache/CacheEntryEventProbe.java | 156 ++
.../cache/IgniteSqlQueryPutBenchmark.java | 31 +-
.../IgniteSqlQueryPutSeparatedBenchmark.java | 84 +
parent/pom.xml | 1 +
pom.xml | 3 +
190 files changed, 15606 insertions(+), 2789 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index b8ccc03,74c71c4..5a31415
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@@ -43,7 -43,10 +43,8 @@@ import org.apache.ignite.internal.GridD
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+ import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5ced545,8d363ad..512a801
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -1205,14 -1186,15 +1215,16 @@@ public abstract class GridCacheMapEntr
val != null,
evtOld,
evtOld != null || hasValueUnlocked(),
- subjId, null, taskName);
+ subjId, null, taskName,
+ keepPortable);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+ cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+ partition(), tx.local(), false, updateCntr0, topVer);
- cctx.dataStructures().onEntryUpdated(key, false);
+ cctx.dataStructures().onEntryUpdated(key, false, keepPortable);
}
if (log.isDebugEnabled())
@@@ -1224,9 -1206,9 +1236,9 @@@
cctx.store().put(tx, keyValue(false), CU.value(val, cctx, false), newVer);
if (intercept)
- cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0));
+ cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepPortable));
- return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
+ return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) :
new GridCacheUpdateTxResult(false, null);
}
@@@ -1377,14 -1366,15 +1397,16 @@@
evtOld != null || hasValueUnlocked(),
subjId,
null,
- taskName);
+ taskName,
+ keepPortable);
}
- if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+ cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
+ || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
- cctx.dataStructures().onEntryUpdated(key, true);
+ cctx.dataStructures().onEntryUpdated(key, true, keepPortable);
deferred = cctx.deferredDelete() && !detached() && !isInternal();
@@@ -1718,9 -1707,14 +1740,14 @@@
if (res)
updateMetrics(op, metrics);
- cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+ if (!isNear()) {
+ long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
+
+ cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+ partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+ }
- cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
+ cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
if (intercept) {
if (op == GridCacheOperation.UPDATE)
@@@ -2336,10 -2377,7 +2415,7 @@@
if (res)
updateMetrics(op, metrics);
- if (cctx.isReplicated() || primary)
- cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
-
- cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
+ cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepPortable);
if (intercept) {
if (op == GridCacheOperation.UPDATE)
@@@ -3186,10 -3230,10 +3268,10 @@@
drReplicate(drType, val, ver);
if (!skipQryNtf) {
- if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
- cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
+ cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
+ || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
- cctx.dataStructures().onEntryUpdated(key, false);
+ cctx.dataStructures().onEntryUpdated(key, false, true);
}
if (cctx.store().isLocal()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index db19c67,55ca12d..2330a95
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -707,11 -649,9 +651,10 @@@ public abstract class GridDhtTxLocalAda
passedKeys,
read,
needRetVal,
- skipped,
accessTtl,
null,
- skipStore);
+ skipStore,
+ keepBinary);
}
catch (IgniteCheckedException e) {
setRollbackOnly();
@@@ -738,14 -677,11 +680,12 @@@
final Collection<KeyCacheObject> passedKeys,
final boolean read,
final boolean needRetVal,
- final Set<KeyCacheObject> skipped,
final long accessTtl,
@Nullable final CacheEntryPredicate[] filter,
- boolean skipStore) {
+ boolean skipStore,
+ boolean keepBinary) {
if (log.isDebugEnabled())
- log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
- skipped + ']');
+ log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']');
if (passedKeys.isEmpty())
return new GridFinishedFuture<>(ret);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 255640f,3ee1048..cd76a56
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -1765,8 -1875,7 +1881,8 @@@ public class GridDhtAtomicCache<K, V> e
req.invokeArguments(),
primary && writeThrough() && !req.skipStore(),
!req.skipStore(),
- req.returnValue(),
+ sndPrevVal || req.returnValue(),
+ req.keepBinary(),
expiry,
true,
true,
@@@ -2036,8 -2156,7 +2164,8 @@@
null,
/*write-through*/false,
/*read-through*/false,
- /*retval*/false,
+ /*retval*/sndPrevVal,
+ req.keepBinary(),
expiry,
/*event*/true,
/*metrics*/true,
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7b95042,72a60d2..a8807e1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@@ -139,9 -144,19 +144,22 @@@ public class GridDhtAtomicUpdateReques
/** Task name hash. */
private int taskNameHash;
+ /** Partition. */
+ private GridLongList updateCntrs;
+
+ /** On response flag. Access should be synced on future. */
+ @GridDirectTransient
+ private boolean onRes;
+
+ @GridDirectTransient
+ private List<Integer> partIds;
+
+ @GridDirectTransient
+ private List<CacheObject> localPrevVals;
+
+ /** Keep portable flag. */
+ private boolean keepBinary;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@@ -191,9 -205,10 +209,11 @@@
this.taskNameHash = taskNameHash;
this.invokeArgs = invokeArgs;
this.addDepInfo = addDepInfo;
+ this.keepBinary = keepBinary;
keys = new ArrayList<>();
+ partIds = new ArrayList<>();
+ localPrevVals = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 648a248,706655b..49a267a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@@ -251,8 -249,7 +251,8 @@@ public class GridNearAtomicCache<K, V>
/*write-through*/false,
/*read-through*/false,
/*retval*/false,
+ keepPortable,
- /**expiry policy*/null,
+ /*expiry policy*/null,
/*event*/true,
/*metrics*/true,
/*primary*/false,
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 685b998,dfaa44e..00f0a75
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -562,8 -520,17 +545,10 @@@ public final class GridNearGetFuture<K
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
else {
- K key0 = key.value(cctx.cacheObjectContext(), true);
- key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
-
- V val0;
-
- if (!skipVals) {
- val0 = v.value(cctx.cacheObjectContext(), true);
- val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
- }
- else
- val0 = (V)Boolean.TRUE;
+ K key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
- V val0 = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable);
++ V val0 = !skipVals ?
++ (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable) :
++ (V)Boolean.TRUE;
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 4b5b204,1c01e4e..2eb4c68
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -347,9 -332,8 +332,9 @@@ public class GridNearTxLocal extends Gr
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
- boolean skipVals,
+ final boolean skipVals,
final boolean needVer,
+ boolean keepBinary,
final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
if (cacheCtx.isNear()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 58ee0c6,ba58f57..cef8371
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@@ -390,10 -392,9 +393,10 @@@ public class GridNearTxRemote extends G
-1L,
cached,
drVer,
- skipStore);
+ skipStore,
+ keepBinary);
- writeMap.put(key, txEntry);
+ txState.addWriteEntry(key, txEntry);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 9f52699,cff62d9..fae7d8c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -2133,265 -2183,363 +2201,363 @@@ public abstract class IgniteTxLocalAdap
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- // First time access.
- if (txEntry == null) {
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
-
- try {
- entry.unswap(false);
-
- // Check if lock is being explicitly acquired by the same thread.
- if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
- entry.lockedByThread(threadId, xidVer))
- throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
- "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
- ", threadId=" + threadId +
- ", locNodeId=" + cctx.localNodeId() + ']');
-
- CacheObject old = null;
- GridCacheVersion readVer = null;
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ drVer,
+ drTtl,
+ drExpireTime,
+ ret,
+ enlisted,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer);
+
+ if (loadMissed) {
+ if (missedForLoad == null)
+ missedForLoad = new HashSet<>();
+
+ missedForLoad.add(cacheKey);
+ }
+ }
- if (optimistic() && !implicit()) {
- try {
- if (needReadVer) {
- T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
- entry.innerGetVersioned(this,
- /*swap*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary) : null;
+ if (missedForLoad != null) {
+ return loadMissing(cacheCtx,
+ missedForLoad,
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ skipStore,
+ retval);
+ }
- if (res != null) {
- old = res.get1();
- readVer = res.get2();
- }
- }
- else {
- old = entry.innerGet(this,
- /*swap*/false,
- /*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary);
- }
- }
- catch (ClusterTopologyCheckedException e) {
- entry.context().evicts().touch(entry, topologyVersion());
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
- throw e;
- }
- }
- else
- old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to load.
+ * @param filter Filter.
+ * @param ret Return value.
+ * @param needReadVer Read version flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param skipStore Skip store flag.
+ * @param retval Return value flag.
+ * @return Load future.
+ */
+ private IgniteInternalFuture<Void> loadMissing(
+ final GridCacheContext cacheCtx,
+ final Set<KeyCacheObject> keys,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ final boolean needReadVer,
+ final boolean singleRmv,
+ final boolean hasFilters,
+ final boolean skipStore,
+ final boolean retval) {
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
- if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
- skipped = skip(skipped, cacheKey);
+ IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
- ret.set(cacheCtx, old, false, keepBinary);
+ assert e != null;
- if (!readCommitted()) {
- // Enlist failed filters as reads for non-read-committed mode,
- // so future ops will get the same values.
- txEntry = addEntry(READ,
- old,
- null,
- null,
- entry,
- null,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore,
- keepBinary);
+ if (needReadVer) {
+ assert loadVer != null;
- txEntry.markValid();
+ e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
- if (needReadVer) {
- assert readVer != null;
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
- }
- ret.set(cacheCtx, null, val != null);
++ ret.set(cacheCtx, null, val != null, keepBinary);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
- if (readCommitted())
- cacheCtx.evicts().touch(entry, topologyVersion());
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
- break; // While.
+ try {
+ ver = e.cached().version();
}
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
- final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
- entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
+ ver = null;
+ }
- if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
- cacheCtx.evicts().touch(entry, topologyVersion());
+ addInvokeResult(e, cacheVal, ret, ver);
+ }
+ else {
+ boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
- enlisted.add(cacheKey);
+ ret.set(cacheCtx, cacheVal, success);
+ }
+ }
+ }
+ };
- if (!pessimistic() && !implicit()) {
- txEntry.markValid();
+ return loadMissing(
+ cacheCtx,
+ /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+ /*async*/true,
+ keys,
+ /*skipVals*/singleRmv,
+ needReadVer,
+ c);
+ }
- if (old == null) {
- if (needVal) {
- if (missedForLoad == null)
- missedForLoad = new HashSet<>();
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param retval Return value flag.
+ * @param lockOnly
+ * @param filter Filter.
+ * @param drVer DR version.
+ * @param drTtl DR ttl.
+ * @param drExpireTime DR expire time.
+ * @param ret Return value.
+ * @param enlisted Enlisted keys collection.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param needVal {@code True} if value is needed.
+ * @param needReadVer {@code True} if need read entry version.
+ * @return {@code True} if entry value should be loaded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ final KeyCacheObject cacheKey,
+ final @Nullable Object val,
+ final @Nullable EntryProcessor<?, ?, ?> entryProcessor,
+ final @Nullable Object[] invokeArgs,
+ final @Nullable ExpiryPolicy expiryPlc,
+ final boolean retval,
+ final boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheVersion drVer,
+ final long drTtl,
+ long drExpireTime,
+ final GridCacheReturn ret,
+ @Nullable final Collection<KeyCacheObject> enlisted,
+ boolean skipStore,
+ boolean singleRmv,
+ boolean hasFilters,
+ final boolean needVal,
+ boolean needReadVer
+ ) throws IgniteCheckedException {
+ boolean loadMissed = false;
- missedForLoad.add(cacheKey);
- }
- else {
- assert !implicit() || !transform : this;
- assert txEntry.op() != TRANSFORM : txEntry;
+ final boolean rmv = val == null && entryProcessor == null;
- if (retval)
- ret.set(cacheCtx, null, true, keepBinary);
- else
- ret.success(true);
- }
- }
- else {
- if (needReadVer) {
- assert readVer != null;
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
+ IgniteTxEntry txEntry = entry(txKey);
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else {
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : txEntry;
+ try {
+ entry.unswap(false);
+
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer)) {
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+ ", entry=" + entry +
+ ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+ }
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version " +
- "[err=" + ex.getMessage() + ']');
+ CacheObject old = null;
+ GridCacheVersion readVer = null;
- ver = null;
- }
+ if (optimistic() && !implicit()) {
+ try {
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ entry.innerGetVersioned(this,
+ /*swap*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null) : null;
- addInvokeResult(txEntry, old, ret, ver);
- }
- else
- ret.success(true);
- }
+ if (res != null) {
+ old = res.get1();
+ readVer = res.get2();
}
}
- // Pessimistic.
else {
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else
- ret.success(true);
+ old = entry.innerGet(this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null);
}
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ catch (ClusterTopologyCheckedException e) {
+ entry.context().evicts().touch(entry, topologyVersion());
+
+ throw e;
}
}
- }
- else {
- if (entryProcessor == null && txEntry.op() == TRANSFORM)
- throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after EntryProcessor is applied): " + key);
-
- GridCacheEntryEx entry = txEntry.cached();
+ else
+ old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
- CacheObject v = txEntry.value();
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+ ret.set(cacheCtx, old, false);
- boolean del = txEntry.op() == DELETE && rmv;
+ if (!readCommitted()) {
+ // Enlist failed filters as reads for non-read-committed mode,
+ // so future ops will get the same values.
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore);
- if (!del) {
- if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
- skipped = skip(skipped, cacheKey);
+ txEntry.markValid();
- ret.set(cacheCtx, v, false, keepBinary);
+ if (needReadVer) {
+ assert readVer != null;
- continue;
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
}
- GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
- v != null ? UPDATE : CREATE;
+ if (readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
+ break; // While.
+ }
+
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore);
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (enlisted != null)
enlisted.add(cacheKey);
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
+ if (!pessimistic() && !implicit()) {
+ txEntry.markValid();
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
+ if (old == null) {
+ if (needVal)
+ loadMissed = true;
+ else {
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+ if (retval)
+ ret.set(cacheCtx, null, true);
+ else
+ ret.success(true);
+ }
+ }
+ else {
+ if (needReadVer) {
+ assert readVer != null;
- ver = null;
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
- addInvokeResult(txEntry, txEntry.value(), ret, ver);
- }
- }
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
- if (!pessimistic()) {
- txEntry.markValid();
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : txEntry;
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version " +
+ "[err=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(txEntry, old, ret, ver);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ // Pessimistic.
+ else {
if (retval && !transform)
- ret.set(cacheCtx, v, true, keepBinary);
+ ret.set(cacheCtx, old, true);
else
ret.success(true);
}
@@@ -2829,19 -3081,15 +3101,16 @@@
drMap,
null,
opCtx != null && opCtx.skipStore(),
- false);
+ false,
+ opCtx != null && opCtx.isKeepBinary());
if (pessimistic()) {
- // Loose all skipped.
- final Set<KeyCacheObject> loaded = loadFut.get();
-
- final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded));
+ assert loadFut == null || loadFut.isDone() : loadFut;
if (log.isDebugEnabled())
- log.debug("Before acquiring transaction lock for put on keys: " + keys);
+ log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys,
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
lockTimeout(),
this,
false,
@@@ -3029,141 -3292,131 +3313,132 @@@
init();
- try {
- Collection<KeyCacheObject> enlisted = new ArrayList<>();
-
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
- ExpiryPolicy plc;
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- if (!F.isEmpty(filter))
- plc = opCtx != null ? opCtx.expiry() : null;
- else
- plc = null;
+ ExpiryPolicy plc;
- final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
- cacheCtx,
- keys0,
- plc,
- implicit,
- /** lookup map */null,
- /** invoke map */null,
- /** invoke arguments */null,
- retval,
- /** lock only */false,
- filter,
- ret,
- enlisted,
- null,
- drMap,
- opCtx != null && opCtx.skipStore(),
- singleRmv,
- opCtx != null && opCtx.isKeepBinary()
- );
+ if (!F.isEmpty(filter))
+ plc = opCtx != null ? opCtx.expiry() : null;
+ else
+ plc = null;
- if (log.isDebugEnabled())
- log.debug("Remove keys: " + enlisted);
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ keys0,
+ plc,
+ /** lookup map */null,
+ /** invoke map */null,
+ /** invoke arguments */null,
+ retval,
+ /** lock only */false,
+ filter,
+ ret,
+ enlisted,
+ null,
+ drMap,
+ opCtx != null && opCtx.skipStore(),
- singleRmv
++ singleRmv,
++ opCtx != null && opCtx.isKeepBinary()
+ );
- // Acquire locks only after having added operation to the write set.
- // Otherwise, during rollback we will not know whether locks need
- // to be rolled back.
- if (pessimistic()) {
- // Loose all skipped.
- final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+ if (log.isDebugEnabled())
+ log.debug("Remove keys: " + enlisted);
- if (log.isDebugEnabled())
- log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
+ // Acquire locks only after having added operation to the write set.
+ // Otherwise, during rollback we will not know whether locks need
+ // to be rolled back.
+ if (pessimistic()) {
+ assert loadFut.isDone() : loadFut;
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys,
- lockTimeout(),
- this,
- false,
- retval,
- isolation,
- isInvalidate(),
- -1L);
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
- PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
- @Override protected GridCacheReturn postLock(GridCacheReturn ret)
- throws IgniteCheckedException
- {
- if (log.isDebugEnabled())
- log.debug("Acquired transaction lock for remove on keys: " + passedKeys);
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ lockTimeout(),
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for remove on keys: " + enlisted);
- postLockWrite(cacheCtx,
- passedKeys,
- loadFut.get(),
- ret,
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
/*remove*/true,
- retval,
+ retval,
/*read*/false,
- -1L,
- filter,
+ -1L,
+ filter,
/*computeInvoke*/false);
- return ret;
- }
- };
+ return ret;
+ }
+ };
- if (fut.isDone()) {
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
try {
- return nonInterruptable(plc1.apply(fut.get(), null));
+ return nonInterruptable(plc1.apply(false, e));
}
- catch (GridClosureException e) {
- return new GridFinishedFuture<>(e.unwrap());
- }
- catch (IgniteCheckedException e) {
- try {
- return nonInterruptable(plc1.apply(false, e));
- }
- catch (Exception e1) {
- return new GridFinishedFuture<>(e1);
- }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
}
}
- else
- return nonInterruptable(new GridEmbeddedFuture<>(
- fut,
- plc1
- ));
}
- else {
- if (implicit()) {
- // Should never load missing values for implicit transaction as values will be returned
- // with prepare response, if required.
- assert loadFut.isDone();
-
- return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
- throws IgniteCheckedException {
- try {
- txFut.get();
+ else
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ else {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
- return implicitRes;
- }
- catch (IgniteCheckedException | RuntimeException e) {
- rollbackAsync();
+ return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
- throw e;
- }
+ return implicitRes;
}
- }));
- }
- else
- return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
- throws IgniteCheckedException {
- f.get();
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackAsync();
- return ret;
+ throw e;
}
- }));
+ }
+ }));
}
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+ throws IgniteCheckedException {
+ f.get();
- return new GridFinishedFuture<>(e);
+ return ret;
+ }
+ }));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
----------------------------------------------------------------------
diff --cc modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
index db44fac,99b2423..dfa4cbc
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
@@@ -74,21 -76,6 +76,11 @@@ public class IgniteSqlQueryPutBenchmar
return true;
}
+ /** {@inheritDoc} */
+ @Override public void onWarmupFinished() {
+ super.onWarmupFinished();
-
- resCnt.reset();
- cnt.reset();
- }
-
- /** {@inheritDoc} */
- @Override public void tearDown() throws Exception {
- ignite().log().info("Average number of entries per query: " + ((double)resCnt.longValue() / cnt.longValue()));
-
- super.tearDown();
+ }
+
/**
* @param minSalary Min salary.
* @param maxSalary Max salary.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/parent/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 68e57c6,1008981..115df88
--- a/pom.xml
+++ b/pom.xml
@@@ -73,10 -73,14 +73,13 @@@
<module>modules/cloud</module>
<module>modules/mesos</module>
<module>modules/kafka</module>
+ <module>modules/flume</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/twitter</module>
<module>modules/mqtt</module>
<module>modules/zookeeper</module>
+ <module>modules/camel</module>
- <module>modules/platform</module>
</modules>
<profiles>