You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/27 12:18:37 UTC
[5/5] ignite git commit: IGNITE-4827: Remove compatibility logic for
1.x versions. This closes #1654.
IGNITE-4827: Remove compatibility logic for 1.x versions. This closes #1654.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/488b25e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/488b25e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/488b25e1
Branch: refs/heads/master
Commit: 488b25e191d66eb970cba9339c41bc0d88479878
Parents: 12e240a
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Mar 27 15:18:01 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 27 15:18:01 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 5 +-
.../ignite/internal/binary/BinaryContext.java | 12 +-
.../communication/GridIoMessageFactory.java | 5 +-
.../discovery/GridDiscoveryManager.java | 38 +-
.../cache/CacheAffinitySharedManager.java | 4 -
.../processors/cache/GridCacheAdapter.java | 184 ++-----
.../GridCachePartitionExchangeManager.java | 62 +--
.../binary/CacheObjectBinaryProcessorImpl.java | 39 --
.../dht/GridClientPartitionTopology.java | 34 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 101 +---
.../dht/GridDhtPartitionTopology.java | 6 +-
.../dht/GridDhtPartitionTopologyImpl.java | 34 +-
.../dht/GridPartitionedGetFuture.java | 8 +-
.../dht/GridPartitionedSingleGetFuture.java | 53 +-
.../dht/preloader/GridDhtPartitionDemander.java | 505 ++-----------------
.../dht/preloader/GridDhtPartitionFullMap.java | 45 +-
.../dht/preloader/GridDhtPartitionMap.java | 210 +++++++-
.../dht/preloader/GridDhtPartitionMap2.java | 329 ------------
.../GridDhtPartitionsAbstractMessage.java | 4 -
.../GridDhtPartitionsExchangeFuture.java | 2 +-
.../preloader/GridDhtPartitionsFullMessage.java | 6 +-
.../GridDhtPartitionsSingleMessage.java | 10 +-
.../dht/preloader/GridDhtPreloader.java | 18 +-
.../CacheContinuousQueryBatchAck.java | 4 -
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 87 +---
.../store/GridCacheStoreManagerAdapter.java | 25 -
.../closure/GridClosureProcessor.java | 398 +--------------
.../continuous/GridContinuousProcessor.java | 3 -
.../h2/twostep/messages/GridQueryRequest.java | 368 --------------
.../service/GridServiceProcessor.java | 132 +----
.../ignite/internal/visor/cache/VisorCache.java | 84 ++-
.../cache/VisorCacheAggregatedMetrics.java | 2 +-
.../visor/cache/VisorCacheConfiguration.java | 20 +-
.../internal/visor/cache/VisorCacheMetrics.java | 26 +
.../cache/VisorCacheMetricsCollectorTask.java | 18 +-
.../visor/cache/VisorCacheMetricsV2.java | 66 ---
.../cache/VisorCacheQueryConfiguration.java | 11 +
.../cache/VisorCacheQueryConfigurationV2.java | 47 --
.../cache/VisorCacheStoreConfiguration.java | 12 +
.../cache/VisorCacheStoreConfigurationV2.java | 48 --
.../internal/visor/cache/VisorCacheV2.java | 73 ---
.../internal/visor/cache/VisorCacheV3.java | 52 --
.../internal/visor/cache/VisorCacheV4.java | 124 -----
.../visor/event/VisorGridDiscoveryEvent.java | 18 +-
.../visor/event/VisorGridDiscoveryEventV2.java | 80 ---
.../visor/node/VisorNodeDataCollectorJob.java | 52 +-
.../internal/visor/query/VisorQueryArg.java | 39 +-
.../internal/visor/query/VisorQueryArgV2.java | 49 --
.../internal/visor/query/VisorQueryArgV3.java | 51 --
.../internal/visor/query/VisorQueryJob.java | 8 +-
.../internal/visor/util/VisorEventMapper.java | 4 +-
.../internal/visor/util/VisorTaskUtils.java | 15 -
.../communication/tcp/TcpCommunicationSpi.java | 17 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 104 +---
.../spi/discovery/tcp/TcpDiscoverySpi.java | 9 -
.../messages/TcpDiscoveryClientAckResponse.java | 4 -
.../resources/META-INF/classnames.properties | 106 ++--
...CacheExchangeMessageDuplicatedStateTest.java | 12 +-
.../dht/GridCacheDhtPreloadDelayedSelfTest.java | 12 +-
.../dht/GridCacheDhtPreloadSelfTest.java | 4 +-
.../GridCacheRebalancingSyncSelfTest.java | 7 +-
.../TcpDiscoverySpiFailureTimeoutSelfTest.java | 60 ---
.../junits/common/GridCommonAbstractTest.java | 4 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 39 --
.../h2/twostep/GridReduceQueryExecutor.java | 62 +--
.../commands/cache/VisorCacheCommand.scala | 19 +-
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
.../commands/disco/VisorDiscoveryCommand.scala | 2 +-
.../cache/WaitMapExchangeFinishCallable.java | 4 +-
.../IgniteFailoverAbstractBenchmark.java | 4 +-
71 files changed, 741 insertions(+), 3362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 5416ff0..ce79b4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -34,12 +34,11 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult;
@@ -121,7 +120,7 @@ public class MarshallerContextImpl implements MarshallerContext {
processResource(jdkClsNames);
checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE);
- checkHasClassName(GridDhtPartitionMap2.class.getName(), ldr, CLS_NAMES_FILE);
+ checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE);
checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE);
if (plugins != null && !plugins.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index febfb04..5f9e4ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -189,12 +189,12 @@ public class BinaryContext {
sysClss.add(IgfsClientUpdateCallable.class.getName());
// Closure processor classes.
- sysClss.add(GridClosureProcessor.C1V2.class.getName());
- sysClss.add(GridClosureProcessor.C1MLAV2.class.getName());
- sysClss.add(GridClosureProcessor.C2V2.class.getName());
- sysClss.add(GridClosureProcessor.C2MLAV2.class.getName());
- sysClss.add(GridClosureProcessor.C4V2.class.getName());
- sysClss.add(GridClosureProcessor.C4MLAV2.class.getName());
+ sysClss.add(GridClosureProcessor.C1.class.getName());
+ sysClss.add(GridClosureProcessor.C1MLA.class.getName());
+ sysClss.add(GridClosureProcessor.C2.class.getName());
+ sysClss.add(GridClosureProcessor.C2MLA.class.getName());
+ sysClss.add(GridClosureProcessor.C4.class.getName());
+ sysClss.add(GridClosureProcessor.C4MLA.class.getName());
sysClss.add(IgniteUuid.class.getName());
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 1d84ead..7bf3de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -141,7 +141,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
-import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
import org.apache.ignite.internal.util.GridByteArrayList;
@@ -793,8 +792,8 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 110:
- msg = new GridQueryRequest();
-
+ // EMPTY type
+ // GridQueryRequest was removed
break;
case 111:
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b2c4ced..b261a56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -69,7 +69,6 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
@@ -1111,12 +1110,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']');
}
- boolean rmtLateAssign;
-
- if (n.version().compareToIgnoreTimestamp(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0)
- rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
- else
- rmtLateAssign = false;
+ boolean rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
if (locDelayAssign != rmtLateAssign) {
throw new IgniteCheckedException("Remote node has cache affinity assignment mode different from local " +
@@ -1127,26 +1121,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
", rmtAddrs=" + U.addressesAsString(n) + ']');
}
- if (n.version().compareToIgnoreTimestamp(GridServiceProcessor.LAZY_SERVICES_CFG_SINCE) >= 0) {
- Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
-
- if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) {
- throw new IgniteCheckedException("Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE +
- " property value differs from remote node's value " +
- "(to make sure all nodes in topology have identical IgniteServices compatibility mode enabled, " +
- "configure system property explicitly) " +
- "[locSrvcCompatibilityEnabled=" + locSrvcCompatibilityEnabled +
- ", rmtSrvcCompatibilityEnabled=" + rmtSrvcCompatibilityEnabled +
- ", locNodeAddrs=" + U.addressesAsString(locNode) +
- ", rmtNodeAddrs=" + U.addressesAsString(n) +
- ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']');
- }
- }
- else if (Boolean.FALSE.equals(locSrvcCompatibilityEnabled)) {
- throw new IgniteCheckedException("Remote node doesn't support lazy services configuration and " +
- "local node cannot join node because local node's "
- + IGNITE_SERVICES_COMPATIBILITY_MODE + " property value explicitly set to 'false'" +
- "[locNodeAddrs=" + U.addressesAsString(locNode) +
+ Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
+
+ if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) {
+ throw new IgniteCheckedException("Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE +
+ " property value differs from remote node's value " +
+ "(to make sure all nodes in topology have identical IgniteServices compatibility mode enabled, " +
+ "configure system property explicitly) " +
+ "[locSrvcCompatibilityEnabled=" + locSrvcCompatibilityEnabled +
+ ", rmtSrvcCompatibilityEnabled=" + rmtSrvcCompatibilityEnabled +
+ ", locNodeAddrs=" + U.addressesAsString(locNode) +
", rmtNodeAddrs=" + U.addressesAsString(n) +
", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d287188..35d68e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -69,9 +68,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
- /** */
- public static final IgniteProductVersion LATE_AFF_ASSIGN_SINCE = IgniteProductVersion.fromString("1.6.0");
-
/** Late affinity assignment flag. */
private boolean lateAffAssign;
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3bfd1f8..63c46c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -100,7 +100,6 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -129,7 +128,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.mxbean.CacheMetricsMXBean;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.IgniteInstanceResource;
@@ -165,12 +163,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Maximum number of retries when topology changes. */
public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
- /** */
- public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7");
-
- /** */
- public static final IgniteProductVersion LOAD_CACHE_JOB_V2_SINCE = IgniteProductVersion.fromString("1.5.19");
-
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
String>>() {
@@ -3485,7 +3477,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*
* @param keys Keys.
* @param plc Expiry policy.
- * @param keepBinary Keep binary flag. Will be ignored for releases older than {@link #LOAD_CACHE_JOB_V2_SINCE}.
+ * @param keepBinary Keep binary flag.
* @return Operation future.
*/
private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K> keys, final ExpiryPolicy plc,
@@ -3495,27 +3487,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (nodes.isEmpty())
return new GridFinishedFuture<>();
- Collection<ClusterNode> oldNodes = ctx.grid().cluster().forDataNodes(name()).forPredicate(
- new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0;
- }
- }).nodes();
-
- if (oldNodes.isEmpty()) {
return ctx.closures().callAsyncNoFailover(BROADCAST,
- new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, keepBinary),
+ new LoadKeysCallable<>(ctx.name(), keys, update, plc, keepBinary),
nodes,
true,
0);
- }
- else {
- return ctx.closures().callAsyncNoFailover(BROADCAST,
- new LoadKeysCallable<>(ctx.name(), keys, update, plc),
- nodes,
- true,
- 0);
- }
}
/**
@@ -3617,27 +3593,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException {
- ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
- .forPredicate(new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0;
- }
- });
-
- ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
- .forPredicate(new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0 &&
- node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0;
- }
- });
-
- ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name())
- .forPredicate(new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode node) {
- return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) >= 0;
- }
- });
ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
@@ -3645,37 +3600,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
- GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
-
- if (!F.isEmpty(oldNodes.nodes())) {
- ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
- Collections.singletonList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
- oldNodes.nodes());
+ Collection<ClusterNode> nodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).nodes();
- fut.add(oldNodesFut);
- }
-
- if (!F.isEmpty(newNodes.nodes())) {
- ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
- Collections.singletonList(
- new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)),
- newNodes.nodes());
+ assert !F.isEmpty(nodes) : "There are not datanodes fo cache: " + ctx.name();
- fut.add(newNodesFut);
- }
-
- if (!F.isEmpty(newNodesV2.nodes())) {
- final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
-
- ComputeTaskInternalFuture newNodesV2Fut = ctx.kernalContext().closure().callAsync(BROADCAST,
- Collections.singletonList(
- new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
- newNodesV2.nodes());
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
- fut.add(newNodesV2Fut);
- }
+ ComputeTaskInternalFuture fut = ctx.kernalContext().closure().callAsync(BROADCAST,
+ Collections.singletonList(
+ new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
+ nodes);
- fut.markInitialized();
return fut;
}
@@ -3786,8 +3721,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
- return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 &&
- ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) ||
+ return ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) ||
(modes.backup && aff.backupByPartition(clusterNode, part, topVer)));
}
}).nodes();
@@ -3934,6 +3868,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * @param opCtx Cache operation context.
* @return JCache Iterator.
*/
private Iterator<Cache.Entry<K, V>> localIteratorHonorExpirePolicy(final CacheOperationContext opCtx) {
@@ -3978,7 +3913,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
- * @param keepBinary
+ * @param keepBinary Keep binary flag.
* @return Distributed ignite cache iterator.
* @throws IgniteCheckedException If failed.
*/
@@ -4339,6 +4274,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @param tx Transaction.
* @param op Cache operation.
+ * @param opCtx Cache operation context.
* @param <T> Return type.
* @return Future.
*/
@@ -5887,6 +5823,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private ExpiryPolicy plc;
+ /** */
+ private boolean keepBinary;
+
/**
* Required by {@link Externalizable}.
*/
@@ -5900,30 +5839,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
* otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}.
* @param plc Expiry policy.
+ * @param keepBinary Keep binary flag.
*/
- LoadKeysCallable(String cacheName,
- Collection<? extends K> keys,
- boolean update,
- ExpiryPolicy plc) {
+ LoadKeysCallable(final String cacheName, final Collection<? extends K> keys, final boolean update,
+ final ExpiryPolicy plc, final boolean keepBinary) {
this.cacheName = cacheName;
this.keys = keys;
this.update = update;
this.plc = plc;
+ this.keepBinary = keepBinary;
}
/** {@inheritDoc} */
@Override public Void call() throws Exception {
- return call0(false);
- }
-
- /**
- * Internal call routine.
- *
- * @param keepBinary Keep binary flag.
- * @return Result (always {@code null}).
- * @throws Exception If failed.
- */
- protected Void call0(boolean keepBinary) throws Exception {
GridCacheAdapter<K, V> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
assert cache != null : cacheName;
@@ -5944,7 +5872,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
+ @Override public void writeExternal(final ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
U.writeCollection(out, keys);
@@ -5952,10 +5880,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
out.writeBoolean(update);
out.writeObject(plc != null ? new IgniteExternalizableExpiryPolicy(plc) : null);
+
+ out.writeBoolean(keepBinary);
}
/** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
cacheName = U.readString(in);
keys = U.readCollection(in);
@@ -5963,56 +5893,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
update = in.readBoolean();
plc = (ExpiryPolicy)in.readObject();
- }
- }
-
- /**
- *
- */
- static class LoadKeysCallableV2<K, V> extends LoadKeysCallable<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private boolean keepBinary;
-
- /**
- * Required by {@link Externalizable}.
- */
- public LoadKeysCallableV2() {
- // No-op.
- }
-
- /**
- * @param cacheName Cache name.
- * @param keys Keys.
- * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
- * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}.
- * @param plc Expiry policy.
- * @param keepBinary Keep binary flag.
- */
- LoadKeysCallableV2(final String cacheName, final Collection<? extends K> keys, final boolean update,
- final ExpiryPolicy plc, final boolean keepBinary) {
- super(cacheName, keys, update, plc);
-
- this.keepBinary = keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override public Void call() throws Exception {
- return call0(keepBinary);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(final ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- out.writeBoolean(keepBinary);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- super.readExternal(in);
keepBinary = in.readBoolean();
}
@@ -6531,9 +6411,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** */
- private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30");
-
/** Partition */
private final int partition;
@@ -6616,9 +6493,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final IgniteProductVersion NEAR_JOB_SINCE = IgniteProductVersion.fromString("1.5.0");
-
/** Cache name. */
private final String cacheName;
@@ -6652,7 +6526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
for (ClusterNode node : subgrid) {
ComputeJob job;
- if (near && node.version().compareTo(NEAR_JOB_SINCE) >= 0) {
+ if (near) {
job = keys == null ? new GlobalClearAllNearJob(cacheName, topVer) :
new GlobalClearKeySetNearJob<>(cacheName, topVer, keys);
}
@@ -6795,6 +6669,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* Constructor.
* @param internalIterator Internal iterator.
+ * @param keepBinary Keep binary.
*/
private EntryIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) {
this.internalIterator = internalIterator;
@@ -6819,7 +6694,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throw new IllegalStateException();
try {
- GridCacheAdapter.this.getAndRemove((K)current.wrapLazyValue(keepBinary).getKey());
+ getAndRemove((K)current.wrapLazyValue(keepBinary).getKey());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -6840,7 +6715,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Keep binary flag. */
private final boolean keepBinary;
- /** Constructor. */
+ /** Constructor.
+ * @param internalSet Internal set.
+ * @param keepBinary Keep binary flag.
+ */
private EntrySet(Set<GridCacheMapEntry> internalSet, boolean keepBinary) {
this.internalSet = internalSet;
this.keepBinary = keepBinary;
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3668910..231dff8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -853,21 +852,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
lastVer,
exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
- boolean useOldApi = false;
-
- if (nodes != null) {
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
- useOldApi = true;
- compress = false;
-
- break;
- }
- else if (!canUsePartitionMapCompression(node))
- compress = false;
- }
- }
-
m.compress(compress);
Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
@@ -890,13 +874,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (affCache != null) {
GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
- if (useOldApi) {
- locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
- locMap.nodeOrder(),
- locMap.updateSequence(),
- locMap);
- }
-
addFullPartitionsMap(m,
dupData,
compress,
@@ -955,7 +932,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
map.nodeOrder(),
map.updateSequence());
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet())
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : map.entrySet())
map0.put(e.getKey(), e.getValue().emptyCopy());
map = map0;
@@ -1007,25 +984,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean clientOnlyExchange,
boolean sndCounters)
{
- boolean compress = canUsePartitionMapCompression(targetNode);
-
GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
clientOnlyExchange,
cctx.versions().last(),
- compress);
+ true);
Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
- GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
-
- if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+ GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap();
addPartitionMap(m,
dupData,
- compress,
+ true,
cacheCtx.cacheId(),
locMap,
cacheCtx.affinity().affinityCache().similarAffinityKey());
@@ -1039,11 +1011,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (m.partitions() != null && m.partitions().containsKey(top.cacheId()))
continue;
- GridDhtPartitionMap2 locMap = top.localPartitionMap();
+ GridDhtPartitionMap locMap = top.localPartitionMap();
addPartitionMap(m,
dupData,
- compress,
+ true,
top.cacheId(),
locMap,
top.similarAffinityKey());
@@ -1067,7 +1039,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData,
boolean compress,
Integer cacheId,
- GridDhtPartitionMap2 map,
+ GridDhtPartitionMap map,
Object affKey) {
Integer dupDataCache = null;
@@ -1292,7 +1264,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
boolean updated = false;
- for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) {
+ for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer cacheId = entry.getKey();
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
@@ -1575,24 +1547,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param node Target node.
- * @return {@code True} if can use compression for partition map messages.
- */
- @SuppressWarnings("SimplifiableIfStatement")
- private boolean canUsePartitionMapCompression(ClusterNode node) {
- IgniteProductVersion ver = node.version();
-
- if (ver.compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) >= 0) {
- if (ver.minor() == 7 && ver.maintenance() < 4)
- return false;
-
- return true;
- }
-
- return false;
- }
-
- /**
* Exchange future thread. All exchanges happen only by one thread and next
* exchange will not start until previous one completes.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index c11f71f..656e70a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -39,7 +39,6 @@ import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.binary.BinaryBasicNameMapper;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -99,7 +98,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.jetbrains.annotations.Nullable;
@@ -115,9 +113,6 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
CacheObjectBinaryProcessor {
/** */
- public static final IgniteProductVersion BINARY_CFG_CHECK_SINCE = IgniteProductVersion.fromString("1.5.7");
-
- /** */
private final CountDownLatch startLatch = new CountDownLatch(1);
/** */
@@ -351,37 +346,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
startLatch.countDown();
}
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK) && marsh instanceof BinaryMarshaller) {
- BinaryConfiguration bcfg = ctx.config().getBinaryConfiguration();
-
- for (ClusterNode rmtNode : ctx.discovery().remoteNodes()) {
- if (rmtNode.version().compareTo(BINARY_CFG_CHECK_SINCE) < 0) {
- if (bcfg == null || bcfg.getNameMapper() == null) {
- throw new IgniteCheckedException("When BinaryMarshaller is used and topology contains old " +
- "nodes, then " + BinaryBasicNameMapper.class.getName() + " mapper have to be set " +
- "explicitely into binary configuration and 'simpleName' property of the mapper " +
- "have to be set to 'true'.");
- }
-
- if (!(bcfg.getNameMapper() instanceof BinaryBasicNameMapper)
- || !((BinaryBasicNameMapper)bcfg.getNameMapper()).isSimpleName()) {
- U.quietAndWarn(log, "When BinaryMarshaller is used and topology contains old" +
- " nodes, it's strongly recommended, to set " + BinaryBasicNameMapper.class.getName() +
- " mapper into binary configuration explicitely " +
- " and 'simpleName' property of the mapper set to 'true' (fix configuration or set " +
- "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property).");
- }
-
- break;
- }
- }
- }
- }
-
/**
* @param key Metadata key.
* @param newMeta Metadata.
@@ -926,9 +890,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
Object rmtBinaryCfg = rmtNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);
- if (rmtNode.version().compareTo(BINARY_CFG_CHECK_SINCE) < 0)
- return null;
-
ClusterNode locNode = ctx.discovery().localNode();
Object locBinaryCfg = locNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 6ca15de..13a2f59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -158,7 +158,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @return Full map string representation.
*/
@SuppressWarnings( {"ConstantConditions"})
- private String mapString(GridDhtPartitionMap2 map) {
+ private String mapString(GridDhtPartitionMap map) {
return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString();
}
@@ -377,11 +377,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtPartitionMap2 localPartitionMap() {
+ @Override public GridDhtPartitionMap localPartitionMap() {
lock.readLock().lock();
try {
- return new GridDhtPartitionMap2(cctx.localNodeId(), updateSeq.get(), topVer,
+ return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), topVer,
Collections.<Integer, GridDhtPartitionState>emptyMap(), true);
}
finally {
@@ -394,7 +394,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
lock.readLock().lock();
try {
- GridDhtPartitionMap2 partMap = node2part.get(nodeId);
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
if (partMap != null) {
GridDhtPartitionState state = partMap.get(part);
@@ -587,8 +587,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
lastExchangeId = exchId;
if (node2part != null) {
- for (GridDhtPartitionMap2 part : node2part.values()) {
- GridDhtPartitionMap2 newPart = partMap.get(part.nodeId());
+ for (GridDhtPartitionMap part : node2part.values()) {
+ GridDhtPartitionMap newPart = partMap.get(part.nodeId());
// If for some nodes current partition has a newer map,
// then we keep the newer value.
@@ -618,7 +618,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
Map<Integer, Set<UUID>> p2n = new HashMap<>();
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
for (Integer p : e.getValue().keySet()) {
Set<UUID> ids = p2n.get(p);
@@ -650,7 +650,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap2 parts,
+ GridDhtPartitionMap parts,
Map<Integer, Long> cntrMap,
boolean checkEvictions) {
if (log.isDebugEnabled())
@@ -686,7 +686,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
node2part = new GridDhtPartitionFullMap();
}
- GridDhtPartitionMap2 cur = node2part.get(parts.nodeId());
+ GridDhtPartitionMap cur = node2part.get(parts.nodeId());
if (cur != null && cur.updateSequence() >= parts.updateSequence()) {
if (log.isDebugEnabled())
@@ -797,10 +797,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
- GridDhtPartitionMap2 map = node2part.get(nodeId);
+ GridDhtPartitionMap map = node2part.get(nodeId);
if (map == null)
- node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
+ node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, topVer,
Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
map.updateSequence(updateSeq, topVer);
@@ -838,7 +838,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
part2node = new HashMap<>(part2node);
- GridDhtPartitionMap2 parts = node2part.remove(nodeId);
+ GridDhtPartitionMap parts = node2part.remove(nodeId);
if (parts != null) {
for (Integer p : parts.keySet()) {
@@ -926,7 +926,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
", locNodeId=" + cctx.localNodeId() +
", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
- for (GridDhtPartitionMap2 map : node2part.values()) {
+ for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
return true;
}
@@ -956,7 +956,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (nodeId == null)
return false;
- GridDhtPartitionMap2 parts = node2part.get(nodeId);
+ GridDhtPartitionMap parts = node2part.get(nodeId);
// Set can be null if node has been removed.
if (parts != null) {
@@ -984,7 +984,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (node2part == null)
return;
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Integer p : e.getValue().keySet()) {
Set<UUID> nodeIds = part2node.get(p);
@@ -996,7 +996,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
for (UUID nodeId : e.getValue()) {
- GridDhtPartitionMap2 map = node2part.get(nodeId);
+ GridDhtPartitionMap map = node2part.get(nodeId);
assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']';
assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 52dd190..e8094e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -47,14 +47,6 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** Affinity assignment. */
- @GridDirectTransient
- @GridToStringInclude
- private List<List<ClusterNode>> affAssignment;
-
- /** Affinity assignment bytes. */
- private byte[] affAssignmentBytes;
-
/** */
@GridDirectTransient
private List<List<UUID>> affAssignmentIds;
@@ -80,19 +72,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
* @param cacheId Cache ID.
* @param topVer Topology version.
* @param affAssignment Affinity assignment.
- * @param sndNodeIds If {@code true} sends only node IDs instead of nodes.
*/
public GridDhtAffinityAssignmentResponse(int cacheId,
@NotNull AffinityTopologyVersion topVer,
- List<List<ClusterNode>> affAssignment,
- boolean sndNodeIds) {
+ List<List<ClusterNode>> affAssignment) {
this.cacheId = cacheId;
this.topVer = topVer;
- if (!sndNodeIds)
- this.affAssignment = affAssignment;
- else
- affAssignmentIds = ids(affAssignment);
+ affAssignmentIds = ids(affAssignment);
}
/** {@inheritDoc} */
@@ -112,16 +99,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
* @return Affinity assignment.
*/
public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) {
- if (affAssignment != null)
- return affAssignment;
-
if (affAssignmentIds != null)
- affAssignment = nodes(disco, affAssignmentIds);
+ return nodes(disco, affAssignmentIds);
- return affAssignment;
+ return null;
}
/**
+ * @param disco Discovery manager.
* @return Ideal affinity assignment.
*/
public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) {
@@ -167,6 +152,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/**
* @param assignments Assignment.
+ * @return Assignment where cluster nodes are converted to their ids.
*/
private List<List<UUID>> ids(List<List<ClusterNode>> assignments) {
if (assignments != null) {
@@ -195,7 +181,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 6;
}
/**
@@ -204,13 +190,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- assert affAssignment != null ^ affAssignmentIds != null;
-
- if (affAssignment != null && affAssignmentBytes == null)
- affAssignmentBytes = U.marshal(ctx, affAssignment);
+ assert affAssignmentIds != null;
- if (affAssignmentIds != null && affAssignmentIdsBytes == null)
- affAssignmentIdsBytes = U.marshal(ctx, affAssignmentIds);
+ affAssignmentIdsBytes = U.marshal(ctx, affAssignmentIds);
if (idealAffAssignment != null && idealAffAssignmentBytes == null)
idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment);
@@ -220,55 +202,16 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- assert affAssignmentBytes != null ^ affAssignmentIdsBytes != null;
+ assert affAssignmentIdsBytes != null;
ldr = U.resolveClassLoader(ldr, ctx.gridConfig());
- if (affAssignmentBytes != null && affAssignment == null)
- affAssignment = unmarshalNodes(affAssignmentBytes, ctx, ldr);
-
- if (affAssignmentIdsBytes != null && affAssignmentIds == null)
- affAssignmentIds = U.unmarshal(ctx, affAssignmentIdsBytes, ldr);
+ affAssignmentIds = U.unmarshal(ctx, affAssignmentIdsBytes, ldr);
if (idealAffAssignmentBytes != null && idealAffAssignment == null)
idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr);
}
- /**
- * @param bytes Assignment bytes.
- * @param ctx Context.
- * @param ldr Class loader.
- * @return Assignment.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- private List<List<ClusterNode>> unmarshalNodes(byte[] bytes,
- GridCacheSharedContext ctx,
- ClassLoader ldr)
- throws IgniteCheckedException
- {
- List<List<ClusterNode>> affAssignment = U.unmarshal(ctx, bytes,
- U.resolveClassLoader(ldr, ctx.gridConfig()));
-
- // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.
- int assignments = affAssignment.size();
-
- for (int n = 0; n < assignments; n++) {
- List<ClusterNode> nodes = affAssignment.get(n);
-
- int size = nodes.size();
-
- for (int i = 0; i < size; i++) {
- ClusterNode node = nodes.get(i);
-
- if (node instanceof TcpDiscoveryNode)
- ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId()));
- }
- }
-
- return affAssignment;
- }
-
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return false;
@@ -290,24 +233,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeByteArray("affAssignmentBytes", affAssignmentBytes))
- return false;
-
- writer.incrementState();
-
- case 4:
if (!writer.writeByteArray("affAssignmentIdsBytes", affAssignmentIdsBytes))
return false;
writer.incrementState();
- case 5:
+ case 4:
if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
return false;
writer.incrementState();
- case 6:
+ case 5:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -330,14 +267,6 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- affAssignmentBytes = reader.readByteArray("affAssignmentBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
affAssignmentIdsBytes = reader.readByteArray("affAssignmentIdsBytes");
if (!reader.isLastRead())
@@ -345,7 +274,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
- case 5:
+ case 4:
idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
if (!reader.isLastRead())
@@ -353,7 +282,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
reader.incrementState();
- case 6:
+ case 5:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 605150a..aec3d7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.jetbrains.annotations.Nullable;
@@ -148,7 +148,7 @@ public interface GridDhtPartitionTopology {
/**
* @return Local IDs.
*/
- public GridDhtPartitionMap2 localPartitionMap();
+ public GridDhtPartitionMap localPartitionMap();
/**
* @param nodeId Node ID.
@@ -230,7 +230,7 @@ public interface GridDhtPartitionTopology {
* @return {@code True} if topology state changed.
*/
@Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap2 parts,
+ GridDhtPartitionMap parts,
@Nullable Map<Integer, Long> cntrMap,
boolean checkEvictions);
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index c476886..7a98366 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
@@ -176,7 +176,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @return Full map string representation.
*/
@SuppressWarnings({"ConstantConditions"})
- private String mapString(GridDhtPartitionMap2 map) {
+ private String mapString(GridDhtPartitionMap map) {
return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString();
}
@@ -781,7 +781,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtPartitionMap2 localPartitionMap() {
+ @Override public GridDhtPartitionMap localPartitionMap() {
Map<Integer, GridDhtPartitionState> map = new HashMap<>();
lock.readLock().lock();
@@ -796,7 +796,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
map.put(i, part.state());
}
- return new GridDhtPartitionMap2(cctx.nodeId(),
+ return new GridDhtPartitionMap(cctx.nodeId(),
updateSeq.get(),
topVer,
Collections.unmodifiableMap(map),
@@ -812,7 +812,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lock.readLock().lock();
try {
- GridDhtPartitionMap2 partMap = node2part.get(nodeId);
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
if (partMap != null) {
GridDhtPartitionState state = partMap.get(part);
@@ -1059,8 +1059,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
lastExchangeId = exchId;
if (node2part != null) {
- for (GridDhtPartitionMap2 part : node2part.values()) {
- GridDhtPartitionMap2 newPart = partMap.get(part.nodeId());
+ for (GridDhtPartitionMap part : node2part.values()) {
+ GridDhtPartitionMap newPart = partMap.get(part.nodeId());
// If for some nodes current partition has a newer map,
// then we keep the newer value.
@@ -1095,7 +1095,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
Map<Integer, Set<UUID>> p2n = U.newHashMap(cctx.affinity().partitions());
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
for (Integer p : e.getValue().keySet()) {
Set<UUID> ids = p2n.get(p);
@@ -1136,7 +1136,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap2 parts,
+ GridDhtPartitionMap parts,
@Nullable Map<Integer, Long> cntrMap,
boolean checkEvictions) {
if (log.isDebugEnabled())
@@ -1187,7 +1187,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
// Create invalid partition map.
node2part = new GridDhtPartitionFullMap();
- GridDhtPartitionMap2 cur = node2part.get(parts.nodeId());
+ GridDhtPartitionMap cur = node2part.get(parts.nodeId());
if (cur != null && cur.updateSequence() >= parts.updateSequence()) {
if (log.isDebugEnabled())
@@ -1400,10 +1400,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
UUID locNodeId = cctx.localNodeId();
- GridDhtPartitionMap2 map = node2part.get(locNodeId);
+ GridDhtPartitionMap map = node2part.get(locNodeId);
if (map == null) {
- map = new GridDhtPartitionMap2(locNodeId,
+ map = new GridDhtPartitionMap(locNodeId,
updateSeq,
topVer,
Collections.<Integer, GridDhtPartitionState>emptyMap(),
@@ -1448,7 +1448,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
else
node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
- GridDhtPartitionMap2 parts = node2part.remove(nodeId);
+ GridDhtPartitionMap parts = node2part.remove(nodeId);
if (parts != null) {
for (Integer p : parts.keySet()) {
@@ -1574,7 +1574,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
", locNodeId=" + cctx.localNode().id() +
", locName=" + cctx.igniteInstanceName() + ']';
- for (GridDhtPartitionMap2 map : node2part.values()) {
+ for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
return true;
}
@@ -1660,7 +1660,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (nodeId == null)
return false;
- GridDhtPartitionMap2 parts = node2part.get(nodeId);
+ GridDhtPartitionMap parts = node2part.get(nodeId);
// Set can be null if node has been removed.
if (parts != null) {
@@ -1688,7 +1688,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (node2part == null)
return;
- for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Integer p : e.getValue().keySet()) {
Set<UUID> nodeIds = part2node.get(p);
@@ -1700,7 +1700,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
for (UUID nodeId : e.getValue()) {
- GridDhtPartitionMap2 map = node2part.get(nodeId);
+ GridDhtPartitionMap map = node2part.get(nodeId);
assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']';
assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 519239a..f555b84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -53,15 +53,12 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.SINGLE_GET_MSG_SINCE;
-
/**
* Colocated get future.
*/
@@ -72,9 +69,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
- /** Dummy version sent to older nodes for backward compatibility, */
- private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0);
-
/** Logger. */
private static IgniteLogger log;
@@ -335,7 +329,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
cctx.cacheId(),
futId,
fut.futureId(),
- n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER,
+ null,
mappedKeys,
readThrough,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a3f6b72..47f4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
@@ -53,7 +52,6 @@ import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
@@ -68,9 +66,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
/** */
private static final long serialVersionUID = 0L;
- /** */
- public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0");
-
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -270,41 +265,19 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
cctx.mvcc().addFuture(this, futId);
}
- GridCacheMessage req;
-
- if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) {
- req = new GridNearSingleGetRequest(cctx.cacheId(),
- futId.localId(),
- key,
- readThrough,
- topVer,
- subjId,
- taskName == null ? 0 : taskName.hashCode(),
- expiryPlc != null ? expiryPlc.forCreate() : -1L,
- expiryPlc != null ? expiryPlc.forAccess() : -1L,
- skipVals,
- /**add reader*/false,
- needVer,
- cctx.deploymentEnabled());
- }
- else {
- Map<KeyCacheObject, Boolean> map = Collections.singletonMap(key, false);
-
- req = new GridNearGetRequest(
- cctx.cacheId(),
- futId,
- futId,
- cctx.versions().next(),
- map,
- readThrough,
- topVer,
- subjId,
- taskName == null ? 0 : taskName.hashCode(),
- expiryPlc != null ? expiryPlc.forCreate() : -1L,
- expiryPlc != null ? expiryPlc.forAccess() : -1L,
- skipVals,
- cctx.deploymentEnabled());
- }
+ GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
+ futId.localId(),
+ key,
+ readThrough,
+ topVer,
+ subjId,
+ taskName == null ? 0 : taskName.hashCode(),
+ expiryPlc != null ? expiryPlc.forCreate() : -1L,
+ expiryPlc != null ? expiryPlc.forAccess() : -1L,
+ skipVals,
+ /**add reader*/false,
+ needVer,
+ cctx.deploymentEnabled());
try {
cctx.io().send(node, req, cctx.ioPolicy());