You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/13 13:28:54 UTC

ignite git commit: Ignite-perftest - Fixed compilation and backward compatibility.

Repository: ignite
Updated Branches:
  refs/heads/ignite-perftest-merge e58604a4a -> 54f943462


Ignite-perftest - Fixed compilation and backward compatibility.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54f94346
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54f94346
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54f94346

Branch: refs/heads/ignite-perftest-merge
Commit: 54f943462bf714787e229c0f8663164c560902cf
Parents: e58604a
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Nov 13 15:28:45 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 13 15:28:45 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 49 ++++++++++++++++++++
 .../GridDistributedTxPrepareRequest.java        | 10 ++--
 .../dht/GridDhtTransactionalCacheAdapter.java   |  2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  2 +-
 4 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..4c4074e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -133,6 +136,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
 
+    /**  */
+    private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers =
+        new ConcurrentSkipListMap<>();
+
     /** */
     private final AtomicReference<AffinityTopologyVersion> readyTopVer =
         new AtomicReference<>(AffinityTopologyVersion.NONE);
@@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Gets minimum node version for the given topology version.
+     *
+     * @param topVer Topology version to get minimum node version for.
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) {
+        IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+        return vers == null ? cctx.localNode().version() : vers.get1();
+    }
+
+    /**
+     * Gets maximum node version for the given topology version.
+     *
+     * @param topVer Topology version to get maximum node version for.
+     * @return Maximum node version.
+     */
+    public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) {
+        IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+        return vers == null ? cctx.localNode().version() : vers.get2();
+    }
+
+    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -832,6 +863,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
 
+        IgniteProductVersion minVer = cctx.localNode().version();
+        IgniteProductVersion maxVer = cctx.localNode().version();
+
+        for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) {
+            IgniteProductVersion ver = node.version();
+
+            if (ver.compareTo(minVer) < 0)
+                minVer = ver;
+
+            if (ver.compareTo(maxVer) > 0)
+                maxVer = ver;
+        }
+
+        nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer));
+
+        for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet())
+            nodeVers.remove(oldVer);
+
         if (err == null) {
             while (true) {
                 AffinityTopologyVersion readyVer = readyTopVer.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index ba251e4..abd6818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -56,6 +57,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Version in which direct marshalling of tx nodes was introduced. */
+    public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0");
+
     /** Collection to message converter. */
     public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
         @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
@@ -327,9 +331,9 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         if (txNodesMsg == null)
             txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
 
-        // TODO backward compatibility.
-//        if (txNodes != null)
-//            txNodesBytes = ctx.marshaller().marshal(txNodes);
+        // Marshal txNodes only if there is a node in topology with an older version.
+        if (txNodes != null && ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0)
+            txNodesBytes = ctx.marshaller().marshal(txNodes);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3069afd..1ba1c0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -672,7 +672,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+                    catch (GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/54f94346/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 83c220d..7131aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry when adding lock (will retry): " + entry);
                     }
-                    catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+                    catch (GridDistributedLockCancelledException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');