You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/21 09:16:51 UTC

[19/19] ignite git commit: tmp

tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6187b1f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6187b1f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6187b1f8

Branch: refs/heads/ignite-4680-sb
Commit: 6187b1f88226a66f7336731799de13ab98c0ab20
Parents: 956549e
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 21 12:16:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 21 12:16:22 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 58 +++++++-------
 .../GridNearAtomicAbstractUpdateRequest.java    | 37 +++++----
 .../dht/atomic/NearAtomicResponseHelper.java    | 82 --------------------
 3 files changed, 49 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index feed87f..542071a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1791,10 +1791,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 top.readLock();
 
                 try {
-                    if (top.stopping()) {
-
+                    if (top.stopping())
                         return;
-                    }
 
                     // Do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
@@ -1811,11 +1809,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (TEST_STRIPE_SUBMIT) {
                             for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
-                                if (stripeIdx == e.getKey())
-                                    continue;
-                                else {
+                                if (stripeIdx != e.getKey()) {
                                     ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
                                         @Override public void run() {
+                                            // No-op.
                                         }
                                     });
                                 }
@@ -1824,7 +1821,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             update(affAssignment, ver, fut, node, req, null, completionCb);
                         }
                         else {
-                            req.responseHelper(new NearAtomicResponseHelper(stripemap.size()));
+                            req.setResCount(stripemap.size());
 
                             for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
                                 if (stripeIdx == e.getKey())
@@ -1924,23 +1921,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateRequest req,
         int[] stripeIdxs,
         UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
-            node.id(),
-            req.futureId(),
-            req.partition(),
-            false,
-            ctx.deploymentEnabled());
-
         List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
 
         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
-        // Assign next version for update inside entries lock.
-        //if (ver == null)
-
-        if (hasNear)
-            res.nearVersion(ver);
-
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Assigned update version [futId=" + req.futureId() +
                 ", writeVer=" + ver + ']');
@@ -1959,7 +1943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             node,
             hasNear,
             req,
-            res,
+            null,
             locked,
             ver,
             null,
@@ -1974,11 +1958,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (retVal == null)
             retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
 
-        res.returnValue(retVal);
-
         unlockEntries(locked, null);
 
         if (TEST_STRIPE_SUBMIT){
+            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+                node.id(),
+                req.futureId(),
+                req.partition(),
+                false,
+                ctx.deploymentEnabled());
+
+            if (hasNear)
+                res.nearVersion(ver);
+
+            res.returnValue(retVal);
+
             for (int i = 0; i < req.size(); i++) {
                 fut.addWriteEntry(affinityAssignment,
                     req.key(i),
@@ -1997,9 +1991,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             completionCb.apply(req, res);
         }
         else {
-            GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+            if (req.addRes()) {
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+                    node.id(),
+                    req.futureId(),
+                    req.partition(),
+                    false,
+                    ctx.deploymentEnabled());
+
+                if (hasNear)
+                    res.nearVersion(ver);
+
+                res.returnValue(retVal);
 
-            if (res0 != null) {
                 for (int i = 0; i < req.size(); i++) {
                     fut.addWriteEntry(affinityAssignment,
                         req.key(i),
@@ -2470,7 +2474,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nearNode Originating node.
      * @param hasNear {@code True} if originating node has near cache.
      * @param req Update request.
-     * @param res Update response.
      * @param locked Locked entries.
      * @param ver Assigned update version.
      * @param dhtFut Optional DHT future.
@@ -2672,7 +2675,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKey(k, e);
+                if (res != null)
+                    res.addFailedKey(k, e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index c8e904d..ffae596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteLogger;
@@ -90,10 +91,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridToStringExclude
     protected byte flags;
 
-    /** Response helper. */
-    @GridDirectTransient
-    private NearAtomicResponseHelper responseHelper;
-
     /**
      *
      */
@@ -424,21 +421,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
-     * @return Response helper.
-     */
-    public NearAtomicResponseHelper responseHelper() {
-        return responseHelper;
-    }
-
-    /**
-     * @param responseHelper Response helper.
-     */
-    public void responseHelper(
-        NearAtomicResponseHelper responseHelper) {
-        this.responseHelper = responseHelper;
-    }
-
-    /**
      * @param idx Key index.
      * @return Key.
      */
@@ -598,6 +580,23 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
         return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);
     }
 
+    private static final AtomicIntegerFieldUpdater<GridNearAtomicAbstractUpdateRequest> UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridNearAtomicAbstractUpdateRequest.class, "cnt");
+
+    /** */
+    @GridDirectTransient
+    private volatile int cnt;
+
+    void setResCount(int cnt) {
+        this.cnt = cnt;
+    }
+
+    boolean addRes() {
+        int c = UPD.decrementAndGet(this);
+
+        return c == 0;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         StringBuilder flags = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
deleted file mode 100644
index 55c450c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-/**
- *
- */
-public class NearAtomicResponseHelper {
-
-    /** */
-    private GridNearAtomicUpdateResponse res;
-
-    private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
-        AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
-
-    /** */
-    private volatile int cnt;
-
-    /**
-     */
-    public NearAtomicResponseHelper(int cnt) {
-        this.cnt = cnt;
-    }
-
-    /**
-     * @param res Response.
-     * @return {@code true} if all responses added.
-     */
-    public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
-        this.res = res;
-
-        int c = UPD.decrementAndGet(this);
-
-        //mergeResponse(res);
-
-        if (c == 0)
-            return this.res;
-
-        return null;
-    }
-
-    /**
-     * @param res Response.
-     */
-    private void mergeResponse(GridNearAtomicUpdateResponse res) {
-        if (this.res == null)
-            this.res = res;
-        else {
-            if (res.nearValuesIndexes() != null)
-                for (int i = 0; i < res.nearValuesIndexes().size(); i++)
-                    this.res.addNearValue(
-                        res.nearValuesIndexes().get(i),
-                        res.nearValue(i),
-                        res.nearTtl(i),
-                        res.nearExpireTime(i)
-                    );
-
-            if (res.failedKeys() != null)
-                this.res.addFailedKeys(res.failedKeys(), null);
-
-            if (res.skippedIndexes() != null)
-                this.res.skippedIndexes().addAll(res.skippedIndexes());
-        }
-    }
-}