You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/13 13:28:54 UTC
ignite git commit: Ignite-perftest - Fixed compilation and backward
compatibility.
Repository: ignite
Updated Branches:
refs/heads/ignite-perftest-merge e58604a4a -> 54f943462
Ignite-perftest - Fixed compilation and backward compatibility.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54f94346
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54f94346
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54f94346
Branch: refs/heads/ignite-perftest-merge
Commit: 54f943462bf714787e229c0f8663164c560902cf
Parents: e58604a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Nov 13 15:28:45 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 13 15:28:45 2015 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 49 ++++++++++++++++++++
.../GridDistributedTxPrepareRequest.java | 10 ++--
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/colocated/GridDhtColocatedCache.java | 2 +-
4 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..4c4074e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -133,6 +136,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
+ /** */
+ private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers =
+ new ConcurrentSkipListMap<>();
+
/** */
private final AtomicReference<AffinityTopologyVersion> readyTopVer =
new AtomicReference<>(AffinityTopologyVersion.NONE);
@@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Gets minimum node version for the given topology version.
+ *
+ * @param topVer Topology version to get minimum node version for.
+ * @return Minimum node version.
+ */
+ public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) {
+ IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+ return vers == null ? cctx.localNode().version() : vers.get1();
+ }
+
+ /**
+ * Gets maximum node version for the given topology version.
+ *
+ * @param topVer Topology version to get maximum node version for.
+ * @return Maximum node version.
+ */
+ public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) {
+ IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+ return vers == null ? cctx.localNode().version() : vers.get2();
+ }
+
+ /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -832,6 +863,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ IgniteProductVersion minVer = cctx.localNode().version();
+ IgniteProductVersion maxVer = cctx.localNode().version();
+
+ for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) {
+ IgniteProductVersion ver = node.version();
+
+ if (ver.compareTo(minVer) < 0)
+ minVer = ver;
+
+ if (ver.compareTo(maxVer) > 0)
+ maxVer = ver;
+ }
+
+ nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer));
+
+ for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet())
+ nodeVers.remove(oldVer);
+
if (err == null) {
while (true) {
AffinityTopologyVersion readyVer = readyTopVer.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index ba251e4..abd6818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -56,6 +57,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** */
private static final long serialVersionUID = 0L;
+ /** Version in which direct marshalling of tx nodes was introduced. */
+ public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0");
+
/** Collection to message converter. */
public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
@Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
@@ -327,9 +331,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (txNodesMsg == null)
txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
- // TODO backward compatibility.
-// if (txNodes != null)
-// txNodesBytes = ctx.marshaller().marshal(txNodes);
+ // Marshal txNodes only if there is a node in topology with an older version.
+ if (txNodes != null && ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0)
+ txNodesBytes = ctx.marshaller().marshal(txNodes);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3069afd..1ba1c0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -672,7 +672,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+ catch (GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 83c220d..7131aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+ catch (GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');