You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/21 09:16:51 UTC
[19/19] ignite git commit: tmp
tmp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6187b1f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6187b1f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6187b1f8
Branch: refs/heads/ignite-4680-sb
Commit: 6187b1f88226a66f7336731799de13ab98c0ab20
Parents: 956549e
Author: sboikov <sb...@gridgain.com>
Authored: Tue Mar 21 12:16:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 21 12:16:22 2017 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 58 +++++++-------
.../GridNearAtomicAbstractUpdateRequest.java | 37 +++++----
.../dht/atomic/NearAtomicResponseHelper.java | 82 --------------------
3 files changed, 49 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index feed87f..542071a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1791,10 +1791,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
top.readLock();
try {
- if (top.stopping()) {
-
+ if (top.stopping())
return;
- }
// Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
@@ -1811,11 +1809,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (TEST_STRIPE_SUBMIT) {
for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
- if (stripeIdx == e.getKey())
- continue;
- else {
+ if (stripeIdx != e.getKey()) {
ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() {
@Override public void run() {
+ // No-op.
}
});
}
@@ -1824,7 +1821,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
update(affAssignment, ver, fut, node, req, null, completionCb);
}
else {
- req.responseHelper(new NearAtomicResponseHelper(stripemap.size()));
+ req.setResCount(stripemap.size());
for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) {
if (stripeIdx == e.getKey())
@@ -1924,23 +1921,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateRequest req,
int[] stripeIdxs,
UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
- node.id(),
- req.futureId(),
- req.partition(),
- false,
- ctx.deploymentEnabled());
-
List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion(), stripeIdxs);
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- // Assign next version for update inside entries lock.
- //if (ver == null)
-
- if (hasNear)
- res.nearVersion(ver);
-
if (msgLog.isDebugEnabled()) {
msgLog.debug("Assigned update version [futId=" + req.futureId() +
", writeVer=" + ver + ']');
@@ -1959,7 +1943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
node,
hasNear,
req,
- res,
+ null,
locked,
ver,
null,
@@ -1974,11 +1958,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (retVal == null)
retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true);
- res.returnValue(retVal);
-
unlockEntries(locked, null);
if (TEST_STRIPE_SUBMIT){
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ node.id(),
+ req.futureId(),
+ req.partition(),
+ false,
+ ctx.deploymentEnabled());
+
+ if (hasNear)
+ res.nearVersion(ver);
+
+ res.returnValue(retVal);
+
for (int i = 0; i < req.size(); i++) {
fut.addWriteEntry(affinityAssignment,
req.key(i),
@@ -1997,9 +1991,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
else {
- GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
+ if (req.addRes()) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ node.id(),
+ req.futureId(),
+ req.partition(),
+ false,
+ ctx.deploymentEnabled());
+
+ if (hasNear)
+ res.nearVersion(ver);
+
+ res.returnValue(retVal);
- if (res0 != null) {
for (int i = 0; i < req.size(); i++) {
fut.addWriteEntry(affinityAssignment,
req.key(i),
@@ -2470,7 +2474,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nearNode Originating node.
* @param hasNear {@code True} if originating node has near cache.
* @param req Update request.
- * @param res Update response.
* @param locked Locked entries.
* @param ver Assigned update version.
* @param dhtFut Optional DHT future.
@@ -2672,7 +2675,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (IgniteCheckedException e) {
- res.addFailedKey(k, e);
+ if (res != null)
+ res.addFailedKey(k, e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index c8e904d..ffae596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
@@ -90,10 +91,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridToStringExclude
protected byte flags;
- /** Response helper. */
- @GridDirectTransient
- private NearAtomicResponseHelper responseHelper;
-
/**
*
*/
@@ -424,21 +421,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @return Response helper.
- */
- public NearAtomicResponseHelper responseHelper() {
- return responseHelper;
- }
-
- /**
- * @param responseHelper Response helper.
- */
- public void responseHelper(
- NearAtomicResponseHelper responseHelper) {
- this.responseHelper = responseHelper;
- }
-
- /**
* @param idx Key index.
* @return Key.
*/
@@ -598,6 +580,23 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);
}
+ private static final AtomicIntegerFieldUpdater<GridNearAtomicAbstractUpdateRequest> UPD =
+ AtomicIntegerFieldUpdater.newUpdater(GridNearAtomicAbstractUpdateRequest.class, "cnt");
+
+ /** */
+ @GridDirectTransient
+ private volatile int cnt;
+
+ void setResCount(int cnt) {
+ this.cnt = cnt;
+ }
+
+ boolean addRes() {
+ int c = UPD.decrementAndGet(this);
+
+ return c == 0;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
StringBuilder flags = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/ignite/blob/6187b1f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
deleted file mode 100644
index 55c450c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-/**
- *
- */
-public class NearAtomicResponseHelper {
-
- /** */
- private GridNearAtomicUpdateResponse res;
-
- private static final AtomicIntegerFieldUpdater<NearAtomicResponseHelper> UPD =
- AtomicIntegerFieldUpdater.newUpdater(NearAtomicResponseHelper.class, "cnt");
-
- /** */
- private volatile int cnt;
-
- /**
- */
- public NearAtomicResponseHelper(int cnt) {
- this.cnt = cnt;
- }
-
- /**
- * @param res Response.
- * @return {@code true} if all responses added.
- */
- public GridNearAtomicUpdateResponse addResponse(GridNearAtomicUpdateResponse res) {
- this.res = res;
-
- int c = UPD.decrementAndGet(this);
-
- //mergeResponse(res);
-
- if (c == 0)
- return this.res;
-
- return null;
- }
-
- /**
- * @param res Response.
- */
- private void mergeResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null)
- this.res = res;
- else {
- if (res.nearValuesIndexes() != null)
- for (int i = 0; i < res.nearValuesIndexes().size(); i++)
- this.res.addNearValue(
- res.nearValuesIndexes().get(i),
- res.nearValue(i),
- res.nearTtl(i),
- res.nearExpireTime(i)
- );
-
- if (res.failedKeys() != null)
- this.res.addFailedKeys(res.failedKeys(), null);
-
- if (res.skippedIndexes() != null)
- this.res.skippedIndexes().addAll(res.skippedIndexes());
- }
- }
-}