You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/01/13 14:55:32 UTC
[48/50] [abbrv] ignite git commit: Fixes: - allow 'committing' ->
'marked_rollback' tx state change only for thread committing transaction -
fixed 'full_sync' mode for case when tx primary nodes fail - fixed race
between statically configured cache st
Fixes:
- allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction
- fixed 'full_sync' mode for case when tx primary nodes fail
- fixed race between statically configured cache start and GridDhtAffinityAssignmentRequest
- fixed 'prepareMarshal' methods to marshal only once (ignite-2219)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/457a9ae4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/457a9ae4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/457a9ae4
Branch: refs/heads/ignite-gg-10837
Commit: 457a9ae4d3b0d6eef6e92a15f5ef79c15ccf1f95
Parents: 1d8c4e2
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 13 09:21:09 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 13 09:29:17 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteTransactions.java | 4 +-
.../apache/ignite/internal/IgniteKernal.java | 95 +++-
.../cache/CacheEntrySerializablePredicate.java | 3 +-
.../cache/CacheInvokeDirectResult.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 23 +
.../processors/cache/GridCacheProcessor.java | 52 ++-
.../processors/cache/GridCacheReturn.java | 2 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../GridDistributedLockResponse.java | 2 +-
.../GridDistributedTxFinishRequest.java | 11 +-
.../GridDistributedTxPrepareRequest.java | 2 +-
.../GridDistributedTxPrepareResponse.java | 4 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 44 +-
.../distributed/dht/GridDhtLockRequest.java | 2 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 90 ++--
.../dht/GridDhtTxFinishResponse.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 1 +
.../dht/GridDhtTxPrepareRequest.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 21 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 22 +-
.../atomic/GridNearAtomicUpdateResponse.java | 4 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionDemandMessage.java | 6 +-
.../GridDhtPartitionSupplyMessageV2.java | 6 +-
.../preloader/GridDhtPartitionsFullMessage.java | 2 +-
.../GridDhtPartitionsSingleMessage.java | 4 +-
.../distributed/near/GridNearGetResponse.java | 4 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 6 +-
.../near/GridNearSingleGetResponse.java | 2 +-
.../near/GridNearTxFinishFuture.java | 432 ++++++++++++++-----
.../near/GridNearTxFinishRequest.java | 5 +
.../near/GridNearTxFinishResponse.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../near/GridNearTxPrepareResponse.java | 4 +-
.../cache/query/GridCacheQueryRequest.java | 16 +-
.../cache/query/GridCacheQueryResponse.java | 18 +-
.../cache/transactions/IgniteInternalTx.java | 6 +
.../cache/transactions/IgniteTxAdapter.java | 23 +-
.../cache/transactions/IgniteTxEntry.java | 11 +-
.../cache/transactions/IgniteTxHandler.java | 26 +-
.../transactions/IgniteTxLocalAdapter.java | 6 +-
.../cache/transactions/IgniteTxManager.java | 20 +
.../datastreamer/DataStreamerRequest.java | 1 +
.../datastructures/DataStructuresProcessor.java | 11 +-
.../processors/igfs/IgfsAckMessage.java | 4 +-
.../handlers/cache/GridCacheCommandHandler.java | 6 +-
.../ignite/spi/discovery/DiscoverySpi.java | 2 +
.../ignite/stream/socket/SocketStreamer.java | 3 +-
...cheAbstractFullApiMultithreadedSelfTest.java | 13 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
.../processors/cache/GridCacheStopSelfTest.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 30 +-
.../IgniteClientDataStructuresAbstractTest.java | 3 +
.../dht/GridCacheTxNodeFailureSelfTest.java | 7 +-
.../IgniteCacheCommitDelayTxRecoveryTest.java | 376 ++++++++++++++++
.../IgniteCachePutRetryAbstractSelfTest.java | 36 +-
...gniteCachePutRetryTransactionalSelfTest.java | 21 +
.../continuous/GridEventConsumeSelfTest.java | 3 +
.../internal/util/nio/GridNioSelfTest.java | 11 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 -
...CommunicationRecoveryAckClosureSelfTest.java | 19 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +
.../junits/common/GridCommonAbstractTest.java | 34 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 3 +
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 69 ++-
67 files changed, 1314 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index 875b647..dfe6a1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -18,7 +18,7 @@
package org.apache.ignite;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -54,7 +54,7 @@ import org.apache.ignite.transactions.TransactionMetrics;
public interface IgniteTransactions {
/**
* Starts transaction with default isolation, concurrency, timeout, and invalidation policy.
- * All defaults are set in {@link CacheConfiguration} at startup.
+ * All defaults are set in {@link TransactionConfiguration} at startup.
*
* @return New transaction
* @throws IllegalStateException If transaction is already started by this thread.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 14b5816..3def718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1424,8 +1424,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** @throws IgniteCheckedException If registration failed. */
- private void registerExecutorMBeans(ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc,
- ExecutorService mgmtExecSvc, ExecutorService restExecSvc) throws IgniteCheckedException {
+ private void registerExecutorMBeans(ExecutorService execSvc,
+ ExecutorService sysExecSvc,
+ ExecutorService p2pExecSvc,
+ ExecutorService mgmtExecSvc,
+ ExecutorService restExecSvc) throws IgniteCheckedException {
pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor");
sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor");
mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor");
@@ -2414,7 +2417,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- return ctx.cache().publicJCache(name, false);
+ return ctx.cache().publicJCache(name, false, true);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2431,7 +2434,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, true, true).get();
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ null,
+ true,
+ true,
+ true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2467,8 +2475,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- if (ctx.cache().cache(cacheCfg.getName()) == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false, true).get();
+ if (ctx.cache().cache(cacheCfg.getName()) == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ null,
+ false,
+ true,
+ true).get();
+ }
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2491,7 +2505,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, true, true).get();
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ true,
+ true,
+ true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2514,11 +2533,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName());
- if (cache == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+ if (cache == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
else {
- if (cache.configuration().getNearConfiguration() == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+ if (cache.configuration().getNearConfiguration() == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
}
return ctx.cache().publicJCache(cacheCfg.getName());
@@ -2538,7 +2569,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true, true).get();
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ true,
+ true,
+ true).get();
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
@@ -2564,11 +2600,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName);
- if (internalCache == null)
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+ if (internalCache == null) {
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
else {
- if (internalCache.configuration().getNearConfiguration() == null)
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+ if (internalCache.configuration().getNearConfiguration() == null) {
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
}
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
@@ -2587,6 +2635,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cache Cache.
+ * @throws IgniteCheckedException If cache without near cache was already started.
*/
private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCheckedException {
if (!cache.context().isNear())
@@ -2596,7 +2645,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
- IgniteInternalFuture stopFut = destroyCacheAsync(cacheName);
+ IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true);
try {
stopFut.get();
@@ -2608,13 +2657,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Ignite future.
*/
- public IgniteInternalFuture<?> destroyCacheAsync(String cacheName) {
+ public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
guard();
try {
- return ctx.cache().dynamicDestroyCache(cacheName);
+ return ctx.cache().dynamicDestroyCache(cacheName, checkThreadTx);
}
finally {
unguard();
@@ -2627,7 +2677,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
if (ctx.cache().cache(cacheName) == null)
- ctx.cache().getOrCreateFromTemplate(cacheName).get();
+ ctx.cache().getOrCreateFromTemplate(cacheName, true).get();
return ctx.cache().publicJCache(cacheName);
}
@@ -2641,14 +2691,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
- public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName) {
+ public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean checkThreadTx) {
guard();
try {
if (ctx.cache().cache(cacheName) == null)
- return ctx.cache().getOrCreateFromTemplate(cacheName);
+ return ctx.cache().getOrCreateFromTemplate(cacheName, checkThreadTx);
return new GridFinishedFuture<>();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
index a243c4e..20cc005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
@@ -86,7 +86,8 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate {
p.prepareMarshal(ctx);
- bytes = ctx.marshaller().marshal(p);
+ if (bytes == null)
+ bytes = ctx.marshaller().marshal(p);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index bee1427..fefa422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -104,7 +104,7 @@ public class CacheInvokeDirectResult implements Message {
public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
key.prepareMarshal(ctx.cacheObjectContext());
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
if (res != null)
@@ -119,7 +119,7 @@ public class CacheInvokeDirectResult implements Message {
public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
key.finishUnmarshal(ctx.cacheObjectContext(), ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
if (res != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0aa8b1b..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -122,6 +123,28 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> fut = null;
if (cacheMsg.partitionExchangeMessage()) {
+ if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
+ assert cacheMsg.topologyVersion() != null : cacheMsg;
+
+ AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
+
+ assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 :
+ "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']';
+
+ // Need to wait for initial exchange to avoid race between cache start and affinity request.
+ fut = cctx.exchange().affinityReadyFuture(startTopVer);
+
+ if (fut != null && !fut.isDone()) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ lsnr.onMessage(nodeId, cacheMsg);
+ }
+ });
+
+ return;
+ }
+ }
+
long locTopVer = cctx.discovery().topologyVersion();
long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ff02e70..eb6d98e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2030,7 +2030,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
CacheConfiguration cfg = createConfigFromTemplate(cacheName);
- return dynamicStartCache(cfg, cacheName, null, true, true);
+ return dynamicStartCache(cfg, cacheName, null, true, true, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -2041,16 +2041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
- public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName) {
+ public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) {
try {
- if (publicJCache(cacheName, false) != null) // Cache with given name already started.
+ if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
return new GridFinishedFuture<>();
CacheConfiguration cfg = createConfigFromTemplate(cacheName);
- return dynamicStartCache(cfg, cacheName, null, false, true);
+ return dynamicStartCache(cfg, cacheName, null, false, true, checkThreadTx);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -2060,6 +2061,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name.
* @return Cache configuration.
+ * @throws IgniteCheckedException If failed.
*/
private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException {
CacheConfiguration cfgTemplate = null;
@@ -2138,6 +2140,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
* @param failIfExists Fail if exists flag.
+ * @param failIfNotStarted If {@code true} fails if cache is not started.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
@@ -2146,9 +2150,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
boolean failIfExists,
- boolean failIfNotStarted
+ boolean failIfNotStarted,
+ boolean checkThreadTx
) {
- return dynamicStartCache(ccfg, cacheName, nearCfg, CacheType.USER, failIfExists, failIfNotStarted);
+ return dynamicStartCache(ccfg,
+ cacheName,
+ nearCfg,
+ CacheType.USER,
+ failIfExists,
+ failIfNotStarted,
+ checkThreadTx);
}
/**
@@ -2157,7 +2168,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param ccfg Cache configuration.
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
+ * @param cacheType Cache type.
* @param failIfExists Fail if exists flag.
+ * @param failIfNotStarted If {@code true} fails if cache is not started.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
@@ -2167,9 +2181,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Nullable NearCacheConfiguration nearCfg,
CacheType cacheType,
boolean failIfExists,
- boolean failIfNotStarted
+ boolean failIfNotStarted,
+ boolean checkThreadTx
) {
- checkEmptyTransactions();
+ if (checkThreadTx)
+ checkEmptyTransactions();
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName));
@@ -2260,10 +2276,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name to destroy.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is destroyed.
*/
- public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
- checkEmptyTransactions();
+ public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName, boolean checkThreadTx) {
+ if (checkThreadTx)
+ checkEmptyTransactions();
DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
@@ -2898,7 +2916,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
if (cache == null) {
- dynamicStartCache(null, name, null, false, true).get();
+ dynamicStartCache(null, name, null, false, true, true).get();
cache = jCacheProxies.get(masked);
}
@@ -3001,21 +3019,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
- return publicJCache(cacheName, true);
+ return publicJCache(cacheName, true, true);
}
/**
* @param cacheName Cache name.
* @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
* otherwise returns {@code null} in this case.
- * @param <K> type of keys.
- * @param <V> type of values.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
- throws IgniteCheckedException
+ @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName,
+ boolean failIfNotStarted,
+ boolean checkThreadTx) throws IgniteCheckedException
{
if (log.isDebugEnabled())
log.debug("Getting public cache for name: " + cacheName);
@@ -3030,7 +3048,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
if (cache == null) {
- dynamicStartCache(null, cacheName, null, false, failIfNotStarted).get();
+ dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
cache = jCacheProxies.get(masked);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 21154c9..a9edb95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -169,6 +169,7 @@ public class GridCacheReturn implements Externalizable, Message {
* @param cctx Cache context.
* @param cacheObj Value to set.
* @param success Success flag to set.
+ * @param keepBinary Keep binary flag.
* @return This instance for chaining.
*/
public GridCacheReturn set(
@@ -187,6 +188,7 @@ public class GridCacheReturn implements Externalizable, Message {
/**
* @param cctx Cache context.
* @param cacheObj Cache object.
+ * @param keepBinary Keep binary flag.
*/
private void initValue(GridCacheContext cctx, @Nullable CacheObject cacheObj, boolean keepBinary) {
if (loc)
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 27a7587..b64c69c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1626,7 +1626,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
IgniteInternalFuture<?> fut;
try {
- fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
+ fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), true);
}
finally {
onLeave(gate);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index bb3f9ff..f088e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -194,7 +194,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 34b3112..a761fec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
@@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
+ * @param syncCommit Sync commit flag.
+ * @param syncRollback Sync rollback flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -184,6 +184,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
}
/**
+ * @param syncCommit Sync commit flag.
+ */
+ public void syncCommit(boolean syncCommit) {
+ this.syncCommit = syncCommit;
+ }
+
+ /**
* @return Sync rollback flag.
*/
public boolean syncRollback() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e595942..0d26c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -317,7 +317,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (reads != null)
marshalTx(reads, ctx);
- if (dhtVers != null) {
+ if (dhtVers != null && dhtVerKeys == null) {
for (IgniteTxKey key : dhtVers.keySet()) {
GridCacheContext cctx = ctx.cacheContext(key.cacheId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index d2c5aa4..4d22213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -93,7 +93,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -101,7 +101,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e731406..8e041c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -104,7 +104,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (affAssignment != null)
+ if (affAssignment != null && affAssignmentBytes == null)
affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
}
@@ -113,7 +113,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (affAssignmentBytes != null) {
+ if (affAssignmentBytes != null && affAssignment == null) {
affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr);
// TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 98711b8..1c3e052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -494,14 +494,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
boolean found = false;
for (IgniteInternalFuture<?> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture f = (MiniFuture)fut;
- if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
+ if (f.node().id().equals(nodeId)) {
+ f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
- found = true;
- }
+ found = true;
}
}
@@ -551,12 +549,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
synchronized (futs) {
// Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) {
- IgniteInternalFuture<Boolean> fut = futs.get(i);
-
- if (!isMini(fut))
- continue;
-
- MiniFuture mini = (MiniFuture)fut;
+ MiniFuture mini = (MiniFuture)futs.get(i);
if (mini.futureId().equals(miniId)) {
if (!mini.isDone())
@@ -772,14 +765,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
/**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
- /**
*
*/
public void map() {
@@ -1006,7 +991,24 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtLockFuture.class, this, super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ });
+
+ Collection<KeyCacheObject> locks;
+
+ synchronized (this) {
+ locks = new HashSet<>(pendingLocks);
+ }
+
+ return S.toString(GridDhtLockFuture.class, this,
+ "innerFuts", futs,
+ "pendingLocks", locks,
+ "super", super.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 62cf69d..50167d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -311,7 +311,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId));
- if (owned != null) {
+ if (owned != null && ownedKeys == null) {
ownedKeys = new KeyCacheObject[owned.size()];
ownedValues = new GridCacheVersion[ownedKeys.length];
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 65f1cb4..2d98e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -44,6 +43,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
/** Near node ID. */
private UUID nearNodeId;
@@ -64,7 +66,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check comitted flag. */
+ /** Check committed flag. */
private boolean checkCommitted;
/** Partition update counter. */
@@ -81,6 +83,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Task name hash. */
private int taskNameHash;
+ /** */
+ private byte flags;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -100,6 +105,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -180,6 +186,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -302,16 +309,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * Gets versions of not acquired locks with version less then one of transaction being committed.
- *
- * @return Versions of locks for entries participating in transaction that have not been acquired yet
- * have version less then one of transaction being committed.
- */
- public Collection<GridCacheVersion> pendingVersions() {
- return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
@@ -325,6 +322,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
this.checkCommitted = checkCommitted;
}
+ /**
+ * @return {@code True}
+ */
+ public boolean waitRemoteTransactions() {
+ return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param waitRemoteTxs Wait remote transactions flag.
+ */
+ public void waitRemoteTransactions(boolean waitRemoteTxs) {
+ if (waitRemoteTxs)
+ flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
+ else
+ flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
@@ -352,60 +366,66 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
writer.incrementState();
case 19:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 20:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 21:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 24:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 26:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
return false;
writer.incrementState();
case 27:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 28:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -436,6 +456,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 19:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -447,7 +475,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 20:
+ case 21:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -455,7 +483,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
+ case 22:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -463,7 +491,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 22:
+ case 23:
partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
@@ -471,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 23:
+ case 24:
pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -479,7 +507,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 24:
+ case 25:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -487,7 +515,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 25:
+ case 26:
sysInvalidate = reader.readBoolean("sysInvalidate");
if (!reader.isLastRead())
@@ -495,7 +523,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 26:
+ case 27:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -503,7 +531,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 27:
+ case 28:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -511,7 +539,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 28:
+ case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -531,6 +559,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 29;
+ return 30;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index fb4d97d..626ad89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -109,7 +109,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (checkCommittedErr != null)
+ if (checkCommittedErr != null && checkCommittedErrBytes == null)
checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
}
@@ -118,7 +118,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (checkCommittedErrBytes != null)
+ if (checkCommittedErrBytes != null && checkCommittedErr == null)
checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index e026b4e..ebf1002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -421,6 +421,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (!state(PREPARING)) {
if (state() == PREPARED && isSystemInvalidate())
fut.complete();
+
if (setRollbackOnly()) {
if (timedOut())
fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 394ff89..d31ecba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -281,7 +281,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (owned != null) {
+ if (owned != null && ownedKeys == null) {
ownedKeys = owned.keySet();
ownedVals = owned.values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7bee5a3..7cc276f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -656,11 +656,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
- nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+ if (nearEntryProcessorsBytes == null)
+ nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
}
}
@@ -681,13 +684,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
finishUnmarshalCacheObjects(prevVals, cctx, ldr);
if (forceTransformBackups) {
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- if (forceTransformBackups)
- nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ if (nearEntryProcessors == null)
+ nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index f1bb323..95fdeb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -165,7 +165,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
prepareMarshalCacheObjects(nearEvicted, cctx);
- errBytes = ctx.marshaller().marshal(err);
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
}
/** {@inheritDoc} */
@@ -178,7 +179,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
- err = ctx.marshaller().unmarshal(errBytes, ldr);
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 7c0aba5..9c4b486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -184,6 +184,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
* @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
@@ -593,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
filter = null;
}
- if (expiryPlc != null)
+ if (expiryPlc != null && expiryPlcBytes == null)
expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
if (op == TRANSFORM) {
@@ -601,9 +602,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
}
else
prepareMarshalCacheObjects(vals, cctx);
@@ -617,8 +620,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
finishUnmarshalCacheObjects(keys, cctx, ldr);
- if (op == TRANSFORM)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ if (op == TRANSFORM) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
@@ -629,9 +637,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
}
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-
- if (expiryPlcBytes != null)
+ if (expiryPlcBytes != null && expiryPlc == null)
expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index b164e7e..3e3ac29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -394,7 +394,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
GridCacheContext cctx = ctx.cacheContext(cacheId);
@@ -413,7 +413,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
GridCacheContext cctx = ctx.cacheContext(cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 4cdecec..9c5238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -170,7 +170,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
info.marshal(cctx);
}
- errBytes = ctx.marshaller().marshal(err);
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
}
/** {@inheritDoc} */
@@ -187,7 +188,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
info.unmarshal(cctx, ldr);
}
- err = ctx.marshaller().unmarshal(errBytes, ldr);
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 53c3d90..5cb84dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -66,6 +66,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/**
* @param updateSeq Update sequence for this node.
* @param topVer Topology version.
+ * @param cacheId Cache ID.
*/
GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
this.cacheId = cacheId;
@@ -75,6 +76,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/**
* @param cp Message to copy from.
+ * @param parts Partitions.
*/
GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts) {
cacheId = cp.cacheId;
@@ -181,7 +183,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (topic != null)
+ if (topic != null && topicBytes == null)
topicBytes = ctx.marshaller().marshal(topic);
}
@@ -189,7 +191,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (topicBytes != null)
+ if (topicBytes != null && topic == null)
topic = ctx.marshaller().unmarshal(topicBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 41454f9..4451cbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -75,9 +75,13 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/**
* @param updateSeq Update sequence for this node.
* @param cacheId Cache ID.
+ * @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
- GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
+ GridDhtPartitionSupplyMessageV2(long updateSeq,
+ int cacheId,
+ AffinityTopologyVersion topVer,
+ boolean addDepInfo) {
this.cacheId = cacheId;
this.updateSeq = updateSeq;
this.topVer = topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0cbdc91..6afb9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -134,7 +134,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (parts != null && partsBytes == null)
partsBytes = ctx.marshaller().marshal(parts);
- if (partCntrs != null)
+ if (partCntrs != null && partCntrsBytes == null)
partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index c07a508..1185913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -138,7 +138,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes == null && parts != null)
partsBytes = ctx.marshaller().marshal(parts);
- if (partCntrs != null)
+ if (partCntrsBytes == null && partCntrs != null)
partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
@@ -149,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
- if (partCntrsBytes != null)
+ if (partCntrsBytes != null && partCntrs == null)
partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 15a791f..6ac91cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -188,7 +188,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.marshal(cctx);
}
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -203,7 +203,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.unmarshal(cctx, ldr);
}
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fe6180a..7132567 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -54,9 +54,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
AffinityTopologyVersion topVer = null;
- if (tx.system())
+ if (tx.system()) {
topVer = tx.topologyVersionSnapshot();
+ if (topVer == null)
+ topVer = cctx.exchange().readyAffinityVersion();
+ }
+
if (topVer == null)
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 42ad7ed..314c35c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -167,7 +167,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
((GridCacheEntryInfo)res).marshal(cctx);
}
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}