You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/17 13:07:59 UTC

[12/12] incubator-ignite git commit: # Merge remote-tracking branch 'remotes/origin/master' into ignite-426

# Merge remote-tracking branch 'remotes/origin/master' into ignite-426


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25f8f419
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25f8f419
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25f8f419

Branch: refs/heads/ignite-426
Commit: 25f8f419bf75f8c6b83ecbe6cff29865a2d5548d
Parents: f0b24c4 1f00c70
Author: sboikov <sb...@gridgain.com>
Authored: Mon Aug 17 14:06:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Aug 17 14:06:52 2015 +0300

----------------------------------------------------------------------
 .../CachePartialUpdateCheckedException.java     |  29 +++-
 .../processors/cache/GridCacheAdapter.java      | 161 ++++++++++++++++---
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  61 +++++--
 .../near/GridNearOptimisticTxPrepareFuture.java |   2 +-
 ...cheDhtLocalPartitionAfterRemoveSelfTest.java |  33 ++--
 .../cache/GridCacheAbstractFullApiSelfTest.java |   1 -
 .../IgniteCacheSizeFailoverTest.java            | 115 +++++++++++++
 .../IgniteCachePutRetryAbstractSelfTest.java    | 120 +++++++++++---
 ...PutRetryAtomicPrimaryWriteOrderSelfTest.java |  32 ++++
 .../tcp/IgniteCacheSslStartStopSelfTest.java    |   1 +
 .../IgniteCacheFailoverTestSuite.java           |   3 +
 .../testsuites/IgniteCacheTestSuite2.java       |   1 +
 13 files changed, 494 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f8f419/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25f8f419/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index eec7fa0,e527f08..1a779ad
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@@ -287,24 -284,14 +288,24 @@@ public class GridNearAtomicUpdateFutur
              return false;
          }
  
 -        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 +        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
 +        Collection<KeyCacheObject> failedKeys = new ArrayList<>();
 +
 +        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
 +            if (e.getKey().nodeId().equals(nodeId)) {
 +                mappingKeys.add(e.getKey());
 +
 +                failedKeys.addAll(e.getValue().keys());
 +            }
 +        }
  
 -        if (req != null) {
 -            addFailedKeys(req.keys(),
 -                req.topologyVersion(),
 -                new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
 +        if (!mappingKeys.isEmpty()) {
-             if (!failedKeys.isEmpty())
-                 addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
++            if (!failedKeys.isEmpty()) // TODO: top ver.
++                addFailedKeys(failedKeys, null, new ClusterTopologyCheckedException("Primary node left grid before " +
 +                    "response is received: " + nodeId));
  
 -            mappings.remove(nodeId);
 +            for (GridAtomicMappingKey key : mappingKeys)
 +                mappings.remove(key);
  
              checkComplete();
  
@@@ -466,14 -479,17 +489,17 @@@
              X.hasCause(err, ClusterTopologyCheckedException.class) &&
              storeFuture() &&
              remapCnt.decrementAndGet() > 0) {
+             ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
  
-             CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
+             if (!(topErr instanceof  ClusterTopologyServerNotFoundException)) {
+                 CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
  
-             assert !F.isEmpty(cause.failedKeys());
 -                assert cause != null && cause.topologyVersion() != null : err;
++                assert cause != null && !F.isEmpty(cause.failedKeys()) && cause.topologyVersion() != null : err;
  
-             remap(cause.failedKeys());
+                 remap(cause.failedKeys(), cause.topologyVersion());
  
-             return false;
+                 return false;
+             }
          }
  
          if (super.onDone(retval, err)) {
@@@ -1030,24 -1031,37 +1058,24 @@@
      /**
       * Maps future to single node.
       *
 -     * @param nodeId Node ID.
 +     * @param mappingKey Mapping key.
       * @param req Request.
       */
 -    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
 -        singleNodeId = nodeId;
 +    private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
 +        singleNodeId = mappingKey.nodeId();
          singleReq = req;
  
 -        if (cctx.localNodeId().equals(nodeId)) {
 -            cache.updateAllAsyncInternal(nodeId, req,
 -                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
 -                    @Override public void apply(GridNearAtomicUpdateRequest req,
 -                        GridNearAtomicUpdateResponse res) {
 -                        assert res.futureVersion().equals(futVer) : futVer;
 -
 -                        onResult(res.nodeId(), res);
 -                    }
 -                });
 -        }
 -        else {
 -            try {
 -                if (log.isDebugEnabled())
 -                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 +        try {
 +            if (log.isDebugEnabled())
 +                log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
  
 -                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 +            sendRequest(mappingKey, req);
  
 -                if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
 -                    onDone(new GridCacheReturn(cctx, true, null, true));
 -            }
 -            catch (IgniteCheckedException e) {
 -                onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
 -            }
 +            if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
 +                onDone(new GridCacheReturn(cctx, true, null, true));
 +        }
 +        catch (IgniteCheckedException e) {
-             onDone(addFailedKeys(req.keys(), e));
++            onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
          }
      }
  
@@@ -1056,25 -1070,35 +1084,25 @@@
       *
       * @param mappings Mappings to send.
       */
 -    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
 -        UUID locNodeId = cctx.localNodeId();
 -
 -        GridNearAtomicUpdateRequest locUpdate = null;
 +    private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
 +        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
 +            GridAtomicMappingKey mappingKey = e.getKey();
 +            GridNearAtomicUpdateRequest req = e.getValue();
  
 -        // Send messages to remote nodes first, then run local update.
 -        for (GridNearAtomicUpdateRequest req : mappings.values()) {
 -            if (locNodeId.equals(req.nodeId())) {
 -                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
 -                    ", req=" + req + ']';
 +            try {
 +                if (log.isDebugEnabled())
 +                    log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
  
 -                locUpdate = req;
 +                sendRequest(mappingKey, req);
              }
 -            else {
 -                try {
 -                    if (log.isDebugEnabled())
 -                        log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 -
 -                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 -                }
 -                catch (IgniteCheckedException e) {
 -                    addFailedKeys(req.keys(), req.topologyVersion(), e);
 +            catch (IgniteCheckedException ex) {
-                 addFailedKeys(req.keys(), ex);
++                addFailedKeys(req.keys(), req.topologyVersion(), ex);
  
 -                    removeMapping(req.nodeId());
 -                }
 -
 -                if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
 -                    removeMapping(req.nodeId());
 +                removeMapping(mappingKey);
              }
 +
 +            if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
 +                removeMapping(mappingKey);
          }
  
          if (syncMode == FULL_ASYNC)