You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/05 14:41:56 UTC

[34/50] [abbrv] ignite git commit: fixed partition striping for cache messages

fixed partition striping for cache messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfd26bd4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfd26bd4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfd26bd4

Branch: refs/heads/ignite-comm-balance-master
Commit: dfd26bd46c6269eba1dc6812fc8de02ff1b05436
Parents: dd9dd4b
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Nov 25 11:50:01 2016 +0700
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Nov 25 11:50:01 2016 +0700

----------------------------------------------------------------------
 .../internal/managers/communication/GridIoManager.java |  2 +-
 .../internal/managers/communication/GridIoMessage.java | 13 +++++--------
 .../internal/processors/cache/GridCacheMessage.java    |  7 +++++++
 .../cache/distributed/GridDistributedLockRequest.java  |  5 +++++
 .../distributed/GridDistributedUnlockRequest.java      |  5 +++++
 .../cache/distributed/dht/GridDhtLockResponse.java     |  9 ++++++---
 .../cache/distributed/near/GridNearGetRequest.java     |  5 +++++
 .../cache/query/GridCacheDistributedQueryManager.java  |  2 +-
 .../processors/cache/query/GridCacheQueryRequest.java  |  4 ++--
 9 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6cb83a3..e74b417 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -802,7 +802,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
-        if (ctx.config().isUseStripedPool() && msg.partition() != -1) {
+        if (ctx.config().isUseStripedPool() && msg.partition() != Integer.MIN_VALUE) {
             ctx.getStripedExecutorService().execute(msg.partition(), c);
 
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index ea7204c..b1a26e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -20,8 +20,7 @@ package org.apache.ignite.internal.managers.communication;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -326,15 +325,13 @@ public class GridIoMessage implements Message {
     /**
      * Get single partition for this message (if applicable).
      *
-     * @return Partition.
+     * @return Partition ID.
      */
     public int partition() {
-        if (msg instanceof GridNearAtomicUpdateRequest)
-            return ((GridNearAtomicUpdateRequest)msg).partition();
-        else if (msg instanceof GridDhtAtomicUpdateRequest)
-            return ((GridDhtAtomicUpdateRequest)msg).partition();
+        if (msg instanceof GridCacheMessage)
+            return ((GridCacheMessage)msg).partition();
         else
-            return -1;
+            return Integer.MIN_VALUE;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 71f99d3..0646d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -129,6 +129,13 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /**
+     * @return Partition ID this message is targeted to or {@code -1} if it cannot be determined.
+     */
+    public int partition() {
+        return -1;
+    }
+
+    /**
      * If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is
      * set to {@code true}, then the error will be passed into this method.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 9639a9a..a671296 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -310,6 +310,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
         return keys;
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /**
      * @return Max lock wait time.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index df6acdd..5d70ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -89,6 +89,11 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
         partIds.add(key.partition());
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index 1e92b54..63e3309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -57,7 +57,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
     /** Invalid partitions. */
     @GridToStringInclude
     @GridDirectCollection(int.class)
-    private Collection<Integer> invalidParts = new GridLeanSet<>();
+    private Collection<Integer> invalidParts;
 
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
@@ -127,6 +127,9 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
      * @param part Invalid partition.
      */
     public void addInvalidPartition(int part) {
+        if (invalidParts == null)
+            invalidParts = new HashSet<>();
+
         invalidParts.add(part);
     }
 
@@ -134,7 +137,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
      * @return Invalid partitions.
      */
     public Collection<Integer> invalidPartitions() {
-        return invalidParts;
+        return invalidParts == null ? Collections.<Integer>emptySet() : invalidParts;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index fa7f367..4272a4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -244,6 +244,11 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
         return accessTtl;
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /**
      * @param ctx Cache context.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index d34047e..eb5e214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -272,7 +272,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 false,
                 null,
                 req.keyValueFilter(),
-                req.partition(),
+                req.partition() == -1 ? null : req.partition(),
                 req.className(),
                 req.clause(),
                 req.includeMetaData(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/dfd26bd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 60c4662..dbd5fbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -478,8 +478,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     /**
      * @return partition.
      */
-    @Nullable public Integer partition() {
-        return part == -1 ? null : part;
+    public int partition() {
+        return part;
     }
 
     /** {@inheritDoc} */