You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/22 10:49:04 UTC

[01/10] ignite git commit: IGNITE-3075 Implement single key-value pair DHT request/response for ATOMIC cache.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4259 [created] f789e2dd5


IGNITE-3075 Implement single key-value pair DHT request/response for ATOMIC cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51ca24f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51ca24f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51ca24f2

Branch: refs/heads/ignite-4259
Commit: 51ca24f2db32dff9c0034603ea3abfe5ef5cd846
Parents: 88f38ac
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Nov 21 16:48:44 2016 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Mon Nov 21 16:48:44 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/cache/GridCacheIoManager.java    |  25 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  57 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     | 287 ++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  61 ++
 .../GridDhtAtomicSingleUpdateRequest.java       | 678 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  26 +
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 312 +++------
 .../GridNearAtomicAbstractUpdateRequest.java    |   5 +
 .../atomic/GridNearAtomicFullUpdateRequest.java | 108 +--
 .../GridNearAtomicSingleUpdateRequest.java      |   5 +
 .../distributed/near/GridNearAtomicCache.java   |   8 +-
 .../GridCacheAtomicMessageCountSelfTest.java    |   6 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   2 +-
 15 files changed, 1292 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b20de68..f36191c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,12 +67,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -774,7 +775,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] - this
+            case -36:
+                msg = new GridDhtAtomicSingleUpdateRequest();
+
+                break;
+
+            // [-3..119] [124..127] [-36]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c5c1c60..924ce79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
@@ -470,8 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
             return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
-        else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
-            return ((GridDhtAtomicUpdateRequest)cacheMsg).futureVersion();
+        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
             return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
 
@@ -486,8 +488,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
             return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
-        else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
-            return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion();
+        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+            return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
 
         return null;
     }
@@ -791,6 +793,21 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case -36: {
+                GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
+
+                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.onError(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]", msg.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 3bbc348..7e4c4e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -80,7 +81,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
 
     /** Update request. */
-    private final GridNearAtomicAbstractUpdateRequest updateReq;
+    protected final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
     final GridNearAtomicUpdateResponse updateRes;
@@ -90,7 +91,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** Mappings. */
     @GridToStringInclude
-    protected Map<UUID, GridDhtAtomicUpdateRequest> mappings;
+    protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
 
     /** Continuous query closures. */
     private Collection<CI1<Boolean>> cntQryClsrs;
@@ -188,23 +189,16 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
             UUID nodeId = node.id();
 
             if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+                GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId);
 
                 if (updateReq == null) {
-                    updateReq = new GridDhtAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
+                    updateReq = createRequest(
+                        node,
                         futVer,
                         writeVer,
                         syncMode,
                         topVer,
-                        forceTransformBackups,
-                        this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash(),
-                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary(),
-                        this.updateReq.skipStore());
+                        forceTransformBackups);
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -256,7 +250,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         for (UUID nodeId : readers) {
-            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+            GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId);
 
             if (updateReq == null) {
                 ClusterNode node = cctx.discovery().node(nodeId);
@@ -265,20 +259,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                 if (node == null)
                     continue;
 
-                updateReq = new GridDhtAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
+                updateReq = createRequest(
+                    node,
                     futVer,
                     writeVer,
                     syncMode,
                     topVer,
-                    forceTransformBackups,
-                    this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                    cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary(),
-                    this.updateReq.skipStore());
+                    forceTransformBackups);
 
                 mappings.put(nodeId, updateReq);
             }
@@ -336,7 +323,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     final boolean registerResponse(UUID nodeId) {
         int resCnt0;
 
-        GridDhtAtomicUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
+        GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
 
         if (req != null) {
             synchronized (this) {
@@ -365,7 +352,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     final void map() {
         if (!F.isEmpty(mappings)) {
-            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+            for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
                 try {
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
@@ -412,6 +399,24 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
+     * @param node Node.
+     * @param futVer Future version.
+     * @param writeVer Update version.
+     * @param syncMode Write synchronization mode.
+     * @param topVer Topology version.
+     * @param forceTransformBackups Force transform backups flag.
+     * @return Request.
+     */
+    protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
+        ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups
+    );
+
+    /**
      * Callback for backup update response.
      *
      * @param nodeId Backup node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
new file mode 100644
index 0000000..f0bea07
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** Node ID. */
+    @GridDirectTransient
+    protected UUID nodeId;
+
+    /** On response flag. Access should be synced on future. */
+    @GridDirectTransient
+    private boolean onRes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    protected GridDhtAtomicAbstractUpdateRequest() {
+        // N-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     */
+    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Keep binary flag.
+     */
+    public abstract boolean keepBinary();
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    public abstract boolean skipStore();
+
+    /**
+     * @return {@code True} if on response flag changed.
+     */
+    public boolean onResponse() {
+        return !onRes && (onRes = true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /**
+     * @return Force transform backups flag.
+     */
+    public abstract boolean forceTransformBackups();
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        cleanup();
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
+     * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     */
+    public abstract void addWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        int partId,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
+    );
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    public abstract void addNearWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime);
+
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    protected abstract void cleanup();
+
+    /**
+     * @return Subject ID.
+     */
+    public abstract UUID subjectId();
+
+    /**
+     * @return Task name.
+     */
+    public abstract int taskNameHash();
+
+    /**
+     * @return Version assigned on primary node.
+     */
+    public abstract GridCacheVersion futureVersion();
+
+    /**
+     * @return Write version.
+     */
+    public abstract GridCacheVersion writeVersion();
+
+    /**
+     * @return Cache write synchronization mode.
+     */
+    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+
+    /**
+     * @return Keys size.
+     */
+    public abstract int size();
+
+    /**
+     * @return Keys size.
+     */
+    public abstract int nearSize();
+
+    /**
+     * @param key Key to check.
+     * @return {@code true} if request keys contain key.
+     */
+    public abstract boolean hasKey(KeyCacheObject key);
+
+    /**
+     * @param idx Key index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject key(int idx);
+
+    /**
+     * @param idx Partition index.
+     * @return Partition id.
+     */
+    public abstract int partitionId(int idx);
+
+    /**
+     * @param updCntr Update counter.
+     * @return Update counter.
+     */
+    public abstract Long updateCounter(int updCntr);
+
+    /**
+     * @param idx Near key index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject nearKey(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject value(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject previousValue(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Entry processor.
+     */
+    @Nullable public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx);
+
+    /**
+     * @param idx Near key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject nearValue(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Transform closure.
+     */
+    @Nullable public abstract EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict version.
+     */
+    @Nullable public abstract GridCacheVersion conflictVersion(int idx);
+
+    /**
+     * @param idx Index.
+     * @return TTL.
+     */
+    public abstract long ttl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    public abstract long nearTtl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict expire time.
+     */
+    public abstract long conflictExpireTime(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    public abstract long nearExpireTime(int idx);
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public abstract Object[] invokeArguments();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d7eb062..2a7055d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -360,11 +360,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.io().addHandler(
             ctx.cacheId(),
-            GridDhtAtomicUpdateRequest.class,
-            new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+            GridDhtAtomicAbstractUpdateRequest.class,
+            new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
                 @Override public void apply(
                     UUID nodeId,
-                    GridDhtAtomicUpdateRequest req
+                    GridDhtAtomicAbstractUpdateRequest req
                 ) {
                     processDhtAtomicUpdateRequest(
                         nodeId,
@@ -3100,12 +3100,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateFuture fut =
             (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
-        if (fut != null) {
-            if (fut instanceof GridNearAtomicSingleUpdateFuture)
-                ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false);
-            else
-                ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false);
-        }
+        if (fut != null)
+            fut.onResult(nodeId, res, false);
+
         else
             U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
                 "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
@@ -3115,7 +3112,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Dht atomic update request.
      */
-    private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+    private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
                 ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index f83a7b7..656caab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -30,6 +32,8 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
@@ -38,6 +42,9 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
+
     /** Future keys. */
     private KeyCacheObject key;
 
@@ -87,6 +94,49 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
     }
 
     /** {@inheritDoc} */
+    @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+        ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups
+    ) {
+        if (canUseSingleRequest(node)) {
+            assert !forceTransformBackups;
+
+            return new GridDhtAtomicSingleUpdateRequest(
+                cctx.cacheId(),
+                node.id(),
+                futVer,
+                writeVer,
+                syncMode,
+                topVer,
+                updateReq.subjectId(),
+                updateReq.taskNameHash(),
+                cctx.deploymentEnabled(),
+                updateReq.keepBinary(),
+                updateReq.skipStore());
+        }
+        else {
+            return new GridDhtAtomicUpdateRequest(
+                cctx.cacheId(),
+                node.id(),
+                futVer,
+                writeVer,
+                syncMode,
+                topVer,
+                forceTransformBackups,
+                updateReq.subjectId(),
+                updateReq.taskNameHash(),
+                forceTransformBackups ? updateReq.invokeArguments() : null,
+                cctx.deploymentEnabled(),
+                updateReq.keepBinary(),
+                updateReq.skipStore());
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
         if (log.isDebugEnabled())
             log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
@@ -114,6 +164,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         updateRes.addFailedKey(key, err);
     }
 
+    /**
+     * @param node Target node
+     * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest}
+     */
+    private boolean canUseSingleRequest(ClusterNode node) {
+        return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
+            cctx.expiry() == null &&
+            updateReq.expiry() == null &&
+            !updateReq.hasConflictData();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..a03d948
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Near cache key flag. */
+    private static final int NEAR_FLAG_MASK = 0x80;
+
+    /** Future version. */
+    protected GridCacheVersion futVer;
+
+    /** Write version. */
+    protected GridCacheVersion writeVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** Additional flags. */
+    protected byte flags;
+
+    /** Key to update. */
+    @GridToStringInclude
+    protected KeyCacheObject key;
+
+    /** Value to update. */
+    @GridToStringInclude
+    protected CacheObject val;
+
+    /** Previous value. */
+    @GridToStringInclude
+    protected CacheObject prevVal;
+
+    /** Partition. */
+    protected long updateCntr;
+
+    /** */
+    protected int partId;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDhtAtomicSingleUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param writeVer Write version for cache values.
+     * @param syncMode Cache write synchronization mode.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param addDepInfo Deployment info.
+     * @param keepBinary Keep binary flag.
+     * @param skipStore Skip store flag.
+     */
+    GridDhtAtomicSingleUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        boolean addDepInfo,
+        boolean keepBinary,
+        boolean skipStore
+    ) {
+        super(cacheId, nodeId);
+        this.futVer = futVer;
+        this.writeVer = writeVer;
+        this.syncMode = syncMode;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (skipStore)
+            setFlag(true, SKIP_STORE_FLAG_MASK);
+        if (keepBinary)
+            setFlag(true, KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
+     * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     */
+    @Override public void addWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        int partId,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
+    ) {
+        assert entryProcessor == null;
+        assert ttl <= 0 : ttl;
+        assert conflictExpireTime <= 0 : conflictExpireTime;
+        assert conflictVer == null : conflictVer;
+
+        near(false);
+
+        this.key = key;
+        this.partId = partId;
+        this.val = val;
+
+        if (addPrevVal)
+            this.prevVal = prevVal;
+
+        if (updateCntr != null)
+            this.updateCntr = updateCntr;
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    @Override public void addNearWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime) {
+        assert entryProcessor == null;
+        assert ttl <= 0 : ttl;
+
+        near(true);
+
+        this.key = key;
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean forceTransformBackups() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return key != null ? near() ? 0 : 1 : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int nearSize() {
+        return key != null ? near() ? 1 : 0 : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasKey(KeyCacheObject key) {
+        return !near() && F.eq(this.key, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? null : key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitionId(int idx) {
+        assert idx == 0 : idx;
+
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Long updateCounter(int updCntr) {
+        assert updCntr == 0 : updCntr;
+
+        return updateCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject nearKey(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? key : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject value(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? null : val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion writeVersion() {
+        return writeVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
+        return subjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject previousValue(int idx) {
+        assert idx == 0 : idx;
+
+        return prevVal;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject nearValue(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? val : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ttl(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long nearTtl(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long nearExpireTime(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return isFlag(KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     *
+     */
+    private boolean near() {
+        return isFlag(NEAR_FLAG_MASK);
+    }
+
+    /**
+     *
+     */
+    private void near(boolean near) {
+        setFlag(near, NEAR_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalObject(key, cctx);
+
+        prepareMarshalObject(val, cctx);
+
+        prepareMarshalObject(prevVal, cctx);
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalObject(key, cctx, ldr);
+
+        finishUnmarshalObject(val, cctx, ldr);
+
+        finishUnmarshalObject(prevVal, cctx, ldr);
+
+        key.partition(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("prevVal", prevVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeLong("updateCntr", updateCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                prevVal = reader.readMessage("prevVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 10:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                updateCntr = reader.readLong("updateCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                val = reader.readMessage("val");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                writeVer = reader.readMessage("writeVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class);
+    }
+
+    /**
+     * @param obj CacheObject to marshal
+     * @param ctx context
+     * @throws IgniteCheckedException if error
+     */
+    private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+        if (obj != null)
+            obj.prepareMarshal(ctx.cacheObjectContext());
+    }
+
+    /**
+     * @param obj CacheObject un to marshal
+     * @param ctx context
+     * @param ldr class loader
+     * @throws IgniteCheckedException if error
+     */
+    private void finishUnmarshalObject(@Nullable CacheObject obj, GridCacheContext ctx,
+        ClassLoader ldr) throws IgniteCheckedException {
+        if (obj != null)
+            obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+    }
+
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    @Override protected void cleanup() {
+        val = null;
+        prevVal = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -36;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 15;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 864aadd..dd1f1c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * DHT atomic cache backup update future.
@@ -118,6 +121,29 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
+    @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups) {
+        return new GridDhtAtomicUpdateRequest(
+            cctx.cacheId(),
+            node.id(),
+            futVer,
+            writeVer,
+            syncMode,
+            topVer,
+            forceTransformBackups,
+            updateReq.subjectId(),
+            updateReq.taskNameHash(),
+            forceTransformBackups ? updateReq.invokeArguments() : null,
+            cctx.deploymentEnabled(),
+            updateReq.keepBinary(),
+            updateReq.skipStore());
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 55f7560..f2fbb0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,25 +20,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -52,17 +49,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
 /**
  * Lite dht cache backup update request.
  */
-public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** Node ID. */
-    @GridDirectTransient
-    private UUID nodeId;
-
     /** Future version. */
     private GridCacheVersion futVer;
 
@@ -151,10 +141,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Partition. */
     private GridLongList updateCntrs;
 
-    /** On response flag. Access should be synced on future. */
-    @GridDirectTransient
-    private boolean onRes;
-
     /** */
     @GridDirectTransient
     private List<Integer> partIds;
@@ -162,9 +148,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Keep binary flag. */
     private boolean keepBinary;
 
-    /**
-     * Additional flags.
-     */
+    /** Additional flags. */
     private byte flags;
 
     /**
@@ -204,10 +188,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean keepBinary,
         boolean skipStore
     ) {
-        assert invokeArgs == null || forceTransformBackups;
+        super(cacheId, nodeId);
 
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
         this.futVer = futVer;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
@@ -215,12 +197,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.forceTransformBackups = forceTransformBackups;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+
+        assert invokeArgs == null || forceTransformBackups;
+
         this.invokeArgs = invokeArgs;
         this.addDepInfo = addDepInfo;
         this.keepBinary = keepBinary;
 
-        if (skipStore)
-            flags = (byte)(flags | SKIP_STORE_FLAG_MASK);
+        setFlag(skipStore, SKIP_STORE_FLAG_MASK);
 
         keys = new ArrayList<>();
         partIds = new ArrayList<>();
@@ -233,26 +217,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             vals = new ArrayList<>();
     }
 
-    /**
-     * @return Force transform backups flag.
-     */
-    public boolean forceTransformBackups() {
-        return forceTransformBackups;
-    }
-
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     * @param addPrevVal If {@code true} adds previous value.
-     * @param partId Partition.
-     * @param prevVal Previous value.
-     * @param updateCntr Update counter.
-     */
-    public void addWriteValue(KeyCacheObject key,
+    /** {@inheritDoc} */
+    @Override public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -328,14 +294,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             conflictExpireTimes.add(conflictExpireTime);
     }
 
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL.
-     * @param expireTime Expire time.
-     */
-    public void addNearWriteValue(KeyCacheObject key,
+    /** {@inheritDoc} */
+    @Override public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -387,183 +347,114 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
+    @Override public boolean forceTransformBackups() {
+        return forceTransformBackups;
     }
 
-    /**
-     * @return Subject ID.
-     */
-    public UUID subjectId() {
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
         return subjId;
     }
 
-    /**
-     * @return Task name.
-     */
-    public int taskNameHash() {
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
         return taskNameHash;
     }
 
-    /**
-     * @return Keys size.
-     */
-    public int size() {
-        return keys.size();
-    }
-
-    /**
-     * @return Keys size.
-     */
-    public int nearSize() {
-        return nearKeys != null ? nearKeys.size() : 0;
-    }
-
-    /**
-     * @return Version assigned on primary node.
-     */
-    public GridCacheVersion futureVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
         return futVer;
     }
 
-    /**
-     * @return Write version.
-     */
-    public GridCacheVersion writeVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion writeVersion() {
         return writeVer;
     }
 
-    /**
-     * @return Cache write synchronization mode.
-     */
-    public CacheWriteSynchronizationMode writeSynchronizationMode() {
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
         return syncMode;
     }
 
-    /**
-     * @return Topology version.
-     */
+    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
-    /**
-     * @return Keys.
-     */
-    public Collection<KeyCacheObject> keys() {
-        return keys;
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return keys.size();
     }
 
-    /**
-     * @param idx Key index.
-     * @return Key.
-     */
-    public KeyCacheObject key(int idx) {
-        return keys.get(idx);
+    /** {@inheritDoc} */
+    @Override public int nearSize() {
+        return nearKeys != null ? nearKeys.size() : 0;
     }
 
-    /**
-     * @param idx Partition index.
-     * @return Partition id.
-     */
-    public int partitionId(int idx) {
-        return partIds.get(idx);
+    /** {@inheritDoc} */
+    @Override public boolean hasKey(KeyCacheObject key) {
+        return F.contains(keys, key);
     }
 
-    /**
-     * @return Skip write-through to a persistent storage.
-     */
-    public boolean skipStore() {
-        return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK;
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        return keys.get(idx);
     }
 
-    /**
-     * @param updCntr Update counter.
-     * @return Update counter.
-     */
-    public Long updateCounter(int updCntr) {
+    /** {@inheritDoc} */
+    @Override public int partitionId(int idx) {
+        return partIds.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Long updateCounter(int updCntr) {
         if (updateCntrs != null && updCntr < updateCntrs.size())
             return updateCntrs.get(updCntr);
 
         return null;
     }
 
-    /**
-     * @param idx Near key index.
-     * @return Key.
-     */
-    public KeyCacheObject nearKey(int idx) {
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject nearKey(int idx) {
         return nearKeys.get(idx);
     }
 
-    /**
-     * @return Keep binary flag.
-     */
-    public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject value(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject value(int idx) {
         if (vals != null)
             return vals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject previousValue(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject previousValue(int idx) {
         if (prevVals != null)
             return prevVals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Entry processor.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
         return entryProcessors == null ? null : entryProcessors.get(idx);
     }
 
-    /**
-     * @param idx Near key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject nearValue(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject nearValue(int idx) {
         if (nearVals != null)
             return nearVals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Transform closure.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
         return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict version.
-     */
-    @Nullable public GridCacheVersion conflictVersion(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
         if (conflictVers != null) {
             assert idx >= 0 && idx < conflictVers.size();
 
@@ -573,11 +464,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return null;
     }
 
-    /**
-     * @param idx Index.
-     * @return TTL.
-     */
-    public long ttl(int idx) {
+    /** {@inheritDoc} */
+    @Override public long ttl(int idx) {
         if (ttls != null) {
             assert idx >= 0 && idx < ttls.size();
 
@@ -587,11 +475,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return CU.TTL_NOT_CHANGED;
     }
 
-    /**
-     * @param idx Index.
-     * @return TTL for near cache update.
-     */
-    public long nearTtl(int idx) {
+    /** {@inheritDoc} */
+    @Override public long nearTtl(int idx) {
         if (nearTtls != null) {
             assert idx >= 0 && idx < nearTtls.size();
 
@@ -601,11 +486,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return CU.TTL_NOT_CHANGED;
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict expire time.
-     */
-    public long conflictExpireTime(int idx) {
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
         if (conflictExpireTimes != null) {
             assert idx >= 0 && idx < conflictExpireTimes.size();
 
@@ -615,11 +497,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
-    /**
-     * @param idx Index.
-     * @return Expire time for near cache update.
-     */
-    public long nearExpireTime(int idx) {
+    /** {@inheritDoc} */
+    @Override public long nearExpireTime(int idx) {
         if (nearExpireTimes != null) {
             assert idx >= 0 && idx < nearExpireTimes.size();
 
@@ -629,17 +508,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
-    /**
-     * @return {@code True} if on response flag changed.
-     */
-    public boolean onResponse() {
-        return !onRes && (onRes = true);
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return keepBinary;
     }
 
-    /**
-     * @return Optional arguments for entry processor.
-     */
-    @Nullable public Object[] invokeArguments() {
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
         return invokeArgs;
     }
 
@@ -711,16 +591,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.atomicMessageLogger();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -1083,14 +953,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        cleanup();
-    }
-
-    /**
-     * Cleanup values not needed after message was sent.
-     */
-    private void cleanup() {
+    @Override protected void cleanup() {
         nearVals = null;
         prevVals = null;
     }
@@ -1105,6 +968,27 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return 26;
     }
 
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..bae9e3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -223,4 +223,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @return Key.
      */
     public abstract KeyCacheObject key(int idx);
+
+    /**
+     * @return {@code True} if request does not have conflict data.
+     */
+    public abstract boolean hasConflictData();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index b733d7b..c785828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,24 +69,49 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** Future version. */
     private GridCacheVersion futVer;
 
-    /** Fast map flag. */
-    private boolean fastMap;
-
     /** Update version. Set to non-null if fastMap is {@code true}. */
     private GridCacheVersion updateVer;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
-    private boolean topLocked;
-
     /** Write synchronization mode. */
     private CacheWriteSynchronizationMode syncMode;
 
     /** Update operation. */
     private GridCacheOperation op;
 
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
+    /** Fast map flag. */
+    protected boolean fastMap;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+    protected boolean topLocked;
+
+    /** Flag indicating whether request contains primary keys. */
+    protected boolean hasPrimary;
+
+    /** Skip write-through to a persistent storage. */
+    protected boolean skipStore;
+
+    /** */
+    protected boolean clientReq;
+
+    /** Keep binary flag. */
+    protected boolean keepBinary;
+
+    /** Return value flag. */
+    protected boolean retval;
+
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -107,13 +133,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     @GridDirectCollection(byte[].class)
     private List<byte[]> entryProcessorsBytes;
 
-    /** Optional arguments for entry processor. */
-    @GridDirectTransient
-    private Object[] invokeArgs;
-
-    /** Entry processor arguments bytes. */
-    private byte[][] invokeArgsBytes;
-
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -124,8 +143,12 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** Conflict expire times. */
     private GridLongList conflictExpireTimes;
 
-    /** Return value flag. */
-    private boolean retval;
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Entry processor arguments bytes. */
+    private byte[][] invokeArgsBytes;
 
     /** Expiry policy. */
     @GridDirectTransient
@@ -137,28 +160,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     /** Filter. */
     private CacheEntryPredicate[] filter;
 
-    /** Flag indicating whether request contains primary keys. */
-    private boolean hasPrimary;
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
-    /** Skip write-through to a persistent storage. */
-    private boolean skipStore;
-
-    /** */
-    private boolean clientReq;
-
-    /** Keep binary flag. */
-    private boolean keepBinary;
-
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
     /** Maximum possible size of inner collections. */
     @GridDirectTransient
     private int initSize;
@@ -523,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheEntryPredicate[] filter() {
+    @Override @Nullable public CacheEntryPredicate[] filter() {
         return filter;
     }
 
@@ -533,11 +534,19 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasConflictData() {
+        return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
+        if (expiryPlc != null && expiryPlcBytes == null)
+            expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
         prepareMarshalCacheObjects(keys, cctx);
 
         if (filter != null) {
@@ -555,9 +564,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 filter = null;
         }
 
-        if (expiryPlc != null && expiryPlcBytes == null)
-            expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
-
         if (op == TRANSFORM) {
             // force addition of deployment info for entry processors if P2P is enabled globally.
             if (!addDepInfo && ctx.deploymentEnabled())
@@ -579,8 +585,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
         finishUnmarshalCacheObjects(keys, cctx, ldr);
 
+        if (filter != null) {
+            for (CacheEntryPredicate p : filter) {
+                if (p != null)
+                    p.finishUnmarshal(cctx, ldr);
+            }
+        }
+
         if (op == TRANSFORM) {
             if (entryProcessors == null)
                 entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
@@ -591,16 +607,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         else
             finishUnmarshalCacheObjects(vals, cctx, ldr);
 
-        if (filter != null) {
-            for (CacheEntryPredicate p : filter) {
-                if (p != null)
-                    p.finishUnmarshal(cctx, ldr);
-            }
-        }
-
-        if (expiryPlcBytes != null && expiryPlc == null)
-            expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
         if (partIds != null && !partIds.isEmpty()) {
             assert partIds.size() == keys.size();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 211b472..f3b9726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -226,6 +226,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean hasConflictData() {
+        return false;
+    }
+
     /**
      * {@inheritDoc}
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index b5b2c72..a8219b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -41,8 +41,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
@@ -302,15 +302,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      */
     public void processDhtAtomicUpdateRequest(
         UUID nodeId,
-        GridDhtAtomicUpdateRequest req,
+        GridDhtAtomicAbstractUpdateRequest req,
         GridDhtAtomicUpdateResponse res
     ) {
         GridCacheVersion ver = req.writeVersion();
 
         assert ver != null;
 
-        Collection<KeyCacheObject> backupKeys = req.keys();
-
         boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
@@ -329,7 +327,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             break;
                         }
 
-                        if (F.contains(backupKeys, key)) { // Reader became backup.
+                        if (req.hasKey(key)) { // Reader became backup.
                             if (entry.markObsolete(ver))
                                 removeEntry(entry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index a6d612a..e8c5db1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -141,6 +142,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
             commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
             commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
             commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
+            commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class);
 
             int putCnt = 15;
 
@@ -171,7 +173,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
 
             assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
             assertEquals(expNearSingleCnt, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
-            assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+            assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
 
             if (writeOrderMode == CLOCK) {
                 for (int i = 1; i < 4; i++) {
@@ -179,7 +181,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
 
                     assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
                     assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
-                    assertEquals(0, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+                    assertEquals(0, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 0899423..644e310 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,7 +478,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
             Object origMsg = msg.message();
 
             return delay &&
-                ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicUpdateRequest));
+                ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicAbstractUpdateRequest));
         }
     }
 }
\ No newline at end of file


[05/10] ignite git commit: IGNITE-3075 Fixed condition for 'single' request creation

Posted by vo...@apache.org.
IGNITE-3075 Fixed condition for 'single' request creation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d32fa21b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d32fa21b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d32fa21b

Branch: refs/heads/ignite-4259
Commit: d32fa21b673814b060d2362f06ff44838e9c2cdc
Parents: f2dc1d7
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 22 11:33:55 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 22 11:33:55 2016 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 19 ++++++++-----
 .../dht/atomic/GridDhtAtomicCache.java          |  2 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java | 30 +++++++++++++-------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 10 +++++--
 .../GridNearAtomicAbstractUpdateRequest.java    |  5 ----
 .../atomic/GridNearAtomicFullUpdateRequest.java |  5 ----
 .../GridNearAtomicSingleUpdateRequest.java      |  9 ------
 ...CacheLoadingConcurrentGridStartSelfTest.java |  6 +++-
 8 files changed, 44 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 7e4c4e0..361fbe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -86,9 +86,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     /** Update response. */
     final GridNearAtomicUpdateResponse updateRes;
 
-    /** Force transform backup flag. */
-    private boolean forceTransformBackups;
-
     /** Mappings. */
     @GridToStringInclude
     protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -198,7 +195,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                         writeVer,
                         syncMode,
                         topVer,
-                        forceTransformBackups);
+                        ttl,
+                        conflictExpireTime,
+                        conflictVer);
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -265,7 +264,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     writeVer,
                     syncMode,
                     topVer,
-                    forceTransformBackups);
+                    ttl,
+                    expireTime,
+                    null);
 
                 mappings.put(nodeId, updateReq);
             }
@@ -404,7 +405,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param writeVer Update version.
      * @param syncMode Write synchronization mode.
      * @param topVer Topology version.
-     * @param forceTransformBackups Force transform backups flag.
+     * @param ttl TTL.
+     * @param conflictExpireTime Conflict expire time.
+     * @param conflictVer Conflict version.
      * @return Request.
      */
     protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
@@ -413,7 +416,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
-        boolean forceTransformBackups
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer
     );
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2a7055d..940c74e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2369,7 +2369,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
-                boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+                boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.partition(),
                     req.topologyVersion());
 
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 656caab..20d6e90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -30,10 +30,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -100,11 +102,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
-        boolean forceTransformBackups
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer
     ) {
-        if (canUseSingleRequest(node)) {
-            assert !forceTransformBackups;
-
+        if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
             return new GridDhtAtomicSingleUpdateRequest(
                 cctx.cacheId(),
                 node.id(),
@@ -126,10 +128,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
                 writeVer,
                 syncMode,
                 topVer,
-                forceTransformBackups,
+                false,
                 updateReq.subjectId(),
                 updateReq.taskNameHash(),
-                forceTransformBackups ? updateReq.invokeArguments() : null,
+                null,
                 cctx.deploymentEnabled(),
                 updateReq.keepBinary(),
                 updateReq.skipStore());
@@ -166,13 +168,19 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
 
     /**
      * @param node Target node
-     * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest}
+     * @param ttl TTL.
+     * @param conflictExpireTime Conflict expire time.
+     * @param conflictVer Conflict version.
+     * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
      */
-    private boolean canUseSingleRequest(ClusterNode node) {
+    private boolean canUseSingleRequest(ClusterNode node,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer) {
         return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
-            cctx.expiry() == null &&
-            updateReq.expiry() == null &&
-            !updateReq.hasConflictData();
+            (ttl == CU.TTL_NOT_CHANGED) &&
+            (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
+            conflictVer == null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index dd1f1c4..efb35c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * DHT atomic cache backup update future.
@@ -126,7 +127,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
         GridCacheVersion writeVer,
         CacheWriteSynchronizationMode syncMode,
         @NotNull AffinityTopologyVersion topVer,
-        boolean forceTransformBackups) {
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer
+    ) {
         return new GridDhtAtomicUpdateRequest(
             cctx.cacheId(),
             node.id(),
@@ -134,10 +138,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
             writeVer,
             syncMode,
             topVer,
-            forceTransformBackups,
+            false,
             updateReq.subjectId(),
             updateReq.taskNameHash(),
-            forceTransformBackups ? updateReq.invokeArguments() : null,
+            null,
             cctx.deploymentEnabled(),
             updateReq.keepBinary(),
             updateReq.skipStore());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bae9e3a..bee2ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -223,9 +223,4 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @return Key.
      */
     public abstract KeyCacheObject key(int idx);
-
-    /**
-     * @return {@code True} if request does not have conflict data.
-     */
-    public abstract boolean hasConflictData();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c785828..1b11688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -534,11 +534,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasConflictData() {
-        return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null;
-    }
-
-    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index f3b9726..1c1addd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -227,15 +227,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasConflictData() {
-        return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @param ctx
-     */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
index 0801691..ce64e1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -77,6 +78,8 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -95,7 +98,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
         else
             cfg.setCacheConfiguration(ccfg);
 
-        if (!configured)
+        if (!configured) {
             ccfg.setNodeFilter(new P1<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
                     String name = node.attribute(ATTR_GRID_NAME).toString();
@@ -103,6 +106,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
                     return !getTestGridName(0).equals(name);
                 }
             });
+        }
 
         return cfg;
     }


[03/10] ignite git commit: Merge remote-tracking branch 'community/ignite-1.7.4' into ignite-1.7.4

Posted by vo...@apache.org.
Merge remote-tracking branch 'community/ignite-1.7.4' into ignite-1.7.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/551f90db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/551f90db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/551f90db

Branch: refs/heads/ignite-4259
Commit: 551f90dbeebcad35a0e3aac07229fb67578f2ab7
Parents: 6e4a279 51ca24f
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Nov 21 19:16:49 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Mon Nov 21 19:16:49 2016 +0500

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/cache/GridCacheIoManager.java    |  25 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  57 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     | 287 ++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  61 ++
 .../GridDhtAtomicSingleUpdateRequest.java       | 678 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  26 +
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 312 +++------
 .../GridNearAtomicAbstractUpdateRequest.java    |   5 +
 .../atomic/GridNearAtomicFullUpdateRequest.java | 108 +--
 .../GridNearAtomicSingleUpdateRequest.java      |   5 +
 .../distributed/near/GridNearAtomicCache.java   |   8 +-
 .../GridCacheAtomicMessageCountSelfTest.java    |   6 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   2 +-
 15 files changed, 1292 insertions(+), 315 deletions(-)
----------------------------------------------------------------------



[10/10] ignite git commit: Fixed startup issue.

Posted by vo...@apache.org.
Fixed startup issue.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f789e2dd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f789e2dd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f789e2dd

Branch: refs/heads/ignite-4259
Commit: f789e2dd57706659a3d0f1afa333ab2ab4ca69e3
Parents: fc9ee6a
Author: devozerov <vo...@gridgain.com>
Authored: Tue Nov 22 13:48:23 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 13:48:23 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/query/GridQueryProcessor.java  | 4 ++--
 .../processors/query/h2/GridH2IndexingGeoSelfTest.java        | 7 +++++++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f789e2dd/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8befa0e..a372891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -256,12 +256,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                     if (binaryEnabled && !keyOrValMustDeserialize) {
                         // Safe to check null.
-                        if (SQL_TYPES.contains(valCls))
+                        if (SQL_TYPES.contains(valCls) || isGeometryClass(valCls))
                             desc.valueClass(valCls);
                         else
                             desc.valueClass(Object.class);
 
-                        if (SQL_TYPES.contains(keyCls))
+                        if (SQL_TYPES.contains(keyCls) || isGeometryClass(keyCls))
                             desc.keyClass(keyCls);
                         else
                             desc.keyClass(Object.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f789e2dd/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
index b73a5c0..90370b4 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
@@ -35,15 +35,22 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.config.GridTestProperties;
 
 /**
  *
  */
 public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
+    // TODO: To be removed.
+    static {
+        GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+    }
+
     /** */
     private static final int CNT = 100;
 


[06/10] ignite git commit: IGNITE-4225 DataStreamer can hang on changing topology

Posted by vo...@apache.org.
IGNITE-4225 DataStreamer can hang on changing topology


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d15eba4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d15eba4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d15eba4b

Branch: refs/heads/ignite-4259
Commit: d15eba4becf7515b512c1032b193ce75e1589177
Parents: d32fa21
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 11:56:20 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 11:56:20 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/datastreamer/DataStreamerImpl.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d15eba4b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 443783b..bb9ffdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1919,7 +1919,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             AffinityTopologyVersion topVer = cctx.isLocal() ?
                 cctx.affinity().affinityTopologyVersion() :
-                cctx.topology().topologyVersion();
+                cctx.shared().exchange().readyAffinityVersion();
 
             GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
 


[09/10] ignite git commit: Merge remote-tracking branch 'upstream/ignite-1.7.4' into ignite-1.7.4

Posted by vo...@apache.org.
Merge remote-tracking branch 'upstream/ignite-1.7.4' into ignite-1.7.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc9ee6a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc9ee6a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc9ee6a7

Branch: refs/heads/ignite-4259
Commit: fc9ee6a74fe0bf413ab0643d2776a1a43e6dd5d2
Parents: bc695f8 f80bfbd
Author: devozerov <vo...@gridgain.com>
Authored: Tue Nov 22 12:05:32 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 12:05:32 2016 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 19 ++++++++-----
 .../dht/atomic/GridDhtAtomicCache.java          |  2 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java | 30 +++++++++++++-------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 10 +++++--
 .../GridNearAtomicAbstractUpdateRequest.java    |  5 ----
 .../atomic/GridNearAtomicFullUpdateRequest.java |  5 ----
 .../GridNearAtomicSingleUpdateRequest.java      |  9 ------
 .../dht/preloader/GridDhtPreloader.java         | 18 +++++++++++-
 .../datastreamer/DataStreamerImpl.java          |  2 +-
 .../ignite/internal/util/GridLogThrottle.java   | 28 ++++++++++++++----
 ...CacheLoadingConcurrentGridStartSelfTest.java |  6 +++-
 11 files changed, 84 insertions(+), 50 deletions(-)
----------------------------------------------------------------------



[04/10] ignite git commit: Merged ignite-1.6.11 into ignite-1.7.4.

Posted by vo...@apache.org.
Merged ignite-1.6.11 into ignite-1.7.4.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2dc1d71
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2dc1d71
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2dc1d71

Branch: refs/heads/ignite-4259
Commit: f2dc1d71705b86428a04a69c4f2d4ee3a82ed1bd
Parents: 551f90d 6e36a79
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 21 18:12:27 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 21 18:12:27 2016 +0300

----------------------------------------------------------------------
 bin/ignite.bat                                  |   5 +
 .../org/apache/ignite/IgniteDataStreamer.java   |   2 +-
 .../apache/ignite/IgniteSystemProperties.java   |  13 +
 .../internal/ComputeTaskInternalFuture.java     |  11 +
 .../internal/binary/BinaryClassDescriptor.java  |  37 +-
 .../ignite/internal/binary/BinaryContext.java   |  13 +-
 .../internal/binary/BinaryObjectExImpl.java     |  57 +-
 .../internal/binary/BinaryObjectImpl.java       |  23 +
 .../binary/BinaryObjectOffheapImpl.java         |  24 +-
 .../ignite/internal/binary/BinaryUtils.java     |   4 +
 .../binary/builder/BinaryObjectBuilderImpl.java |   6 +-
 .../processors/affinity/AffinityAssignment.java |  88 +++
 .../affinity/GridAffinityAssignment.java        |   8 +-
 .../affinity/GridAffinityAssignmentCache.java   |  35 +-
 .../affinity/GridAffinityProcessor.java         |  89 ++-
 .../processors/affinity/GridAffinityUtils.java  |   8 +-
 .../affinity/HistoryAffinityAssignment.java     | 169 ++++++
 .../cache/CacheAffinitySharedManager.java       |  57 +-
 .../cache/DynamicCacheChangeBatch.java          |   7 +
 .../cache/GridCacheAffinityManager.java         |   6 +-
 .../processors/cache/GridCacheContext.java      |   8 +
 .../processors/cache/GridCacheMapEntry.java     |   5 +-
 .../processors/cache/GridCacheMvccManager.java  |  77 +++
 .../GridCachePartitionExchangeManager.java      | 299 +++++++--
 .../processors/cache/GridCacheProcessor.java    |   5 +-
 .../cache/GridCacheSharedContext.java           |   1 +
 .../processors/cache/GridCacheUtils.java        |  67 --
 .../dht/GridClientPartitionTopology.java        |  33 +-
 .../dht/GridDhtPartitionTopology.java           |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  31 +-
 .../dht/preloader/GridDhtPartitionFullMap.java  |  18 +
 .../dht/preloader/GridDhtPartitionMap2.java     |  53 +-
 .../GridDhtPartitionsAbstractMessage.java       |  40 +-
 .../GridDhtPartitionsExchangeFuture.java        |  84 +--
 .../preloader/GridDhtPartitionsFullMessage.java | 150 ++++-
 .../GridDhtPartitionsSingleMessage.java         | 132 +++-
 .../GridDhtPartitionsSingleRequest.java         |   4 +-
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../query/GridCacheQueryMetricsAdapter.java     |   2 +-
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../datastreamer/DataStreamProcessor.java       | 104 +++-
 .../datastreamer/DataStreamerImpl.java          | 607 ++++++++++++++-----
 .../internal/processors/igfs/IgfsProcessor.java |  15 +
 .../ignite/internal/util/GridLogThrottle.java   |  29 +-
 .../ignite/internal/util/IgniteUtils.java       | 111 +++-
 .../internal/util/future/GridFutureAdapter.java |  12 +-
 .../util/offheap/unsafe/GridUnsafeMemory.java   |  33 +-
 .../visor/misc/VisorResolveHostNameTask.java    |   4 +-
 .../security/SecurityBasicPermissionSet.java    | 107 ++++
 .../security/SecurityPermissionSetBuilder.java  | 222 +++++++
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  38 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 510 +++++++++++++---
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |  59 +-
 .../messages/TcpDiscoveryAbstractMessage.java   |   7 +
 .../messages/TcpDiscoveryClientAckResponse.java |   5 +
 .../TcpDiscoveryClientHeartbeatMessage.java     |   7 +-
 .../TcpDiscoveryConnectionCheckMessage.java     |   5 +
 .../messages/TcpDiscoveryHeartbeatMessage.java  |   5 +
 .../TcpDiscoveryNodeAddFinishedMessage.java     |  11 +
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  33 +-
 .../binary/BinaryMarshallerSelfTest.java        | 343 ++++++++++-
 ...CacheExchangeMessageDuplicatedStateTest.java | 393 ++++++++++++
 .../cache/IgniteCacheDynamicStopSelfTest.java   |  48 +-
 .../cache/IgniteCachePeekModesAbstractTest.java |   2 +-
 ...CacheLoadingConcurrentGridStartSelfTest.java | 251 +++++++-
 ...ncurrentGridStartSelfTestAllowOverwrite.java |  30 +
 .../distributed/IgniteCacheGetRestartTest.java  |   3 +
 ...cingDelayedPartitionMapExchangeSelfTest.java |   8 +-
 .../GridCacheRebalancingSyncSelfTest.java       |  18 +-
 .../GridCacheSyncReplicatedPreloadSelfTest.java |   3 -
 .../IgniteCacheSyncRebalanceModeSelfTest.java   |   2 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 ...ComputeJobExecutionErrorToLogManualTest.java |  88 +++
 .../IgniteNoCustomEventsOnNodeStart.java        |   7 +
 .../DataStreamProcessorSelfTest.java            |   4 +-
 .../datastreamer/DataStreamerImplSelfTest.java  | 170 ++++--
 .../DataStreamerMultiThreadedSelfTest.java      |   2 -
 .../datastreamer/DataStreamerTimeoutTest.java   |  92 ++-
 .../igfs/IgfsProcessorValidationSelfTest.java   |  30 +
 ...IpcEndpointRegistrationAbstractSelfTest.java |  76 ++-
 ...dpointRegistrationOnLinuxAndMacSelfTest.java |  11 +-
 .../SecurityPermissionSetBuilderTest.java       | 110 ++++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 +++-
 .../junits/common/GridCommonAbstractTest.java   |  25 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   3 +
 .../testsuites/IgniteCacheTestSuite2.java       |   5 +
 ...opClientProtocolMultipleServersSelfTest.java | 102 ++--
 .../query/h2/GridH2ResultSetIterator.java       |  62 +-
 .../processors/query/h2/IgniteH2Indexing.java   |   4 +-
 .../query/h2/opt/GridH2ValueCacheObject.java    |  10 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  34 +-
 .../query/h2/twostep/GridMergeIndex.java        |  49 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 .../cache/CacheSqlQueryValueCopySelfTest.java   | 226 +++++++
 .../cache/IgniteCacheOffheapEvictQueryTest.java |   7 +
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |  59 ++
 ...lientQueryReplicatedNodeRestartSelfTest.java |   7 +
 ...butedQueryStopOnCancelOrTimeoutSelfTest.java |   7 +
 .../query/IgniteSqlSplitterSelfTest.java        |   2 +
 .../query/h2/sql/GridQueryParsingTest.java      |  11 +-
 .../IgniteCacheQuerySelfTestSuite2.java         |   2 +
 modules/platforms/cpp/DEVNOTES.txt              |  22 +-
 modules/platforms/cpp/README.txt                |  10 +-
 modules/platforms/cpp/binary/Makefile.am        |   4 +-
 .../cpp/binary/project/vs/binary.vcxproj        |   2 -
 .../cpp/common/project/vs/common.vcxproj        |   1 -
 modules/platforms/cpp/core/Makefile.am          |   4 +-
 .../platforms/cpp/core/project/vs/core.vcxproj  |   2 -
 modules/platforms/cpp/examples/README.txt       |   9 +-
 .../cpp/examples/odbc-example/Makefile.am       |   4 +-
 .../cpp/examples/putget-example/Makefile.am     |   4 +-
 .../cpp/examples/query-example/Makefile.am      |   4 +-
 modules/platforms/cpp/ignite/Makefile.am        |   4 +-
 .../cpp/ignite/project/vs/ignite.vcxproj        |   4 +-
 modules/platforms/cpp/jni/Makefile.am           |   4 +-
 .../platforms/cpp/jni/project/vs/jni.vcxproj    |   1 -
 .../platforms/cpp/odbc-test/include/test_type.h |  42 +-
 .../cpp/odbc-test/src/api_robustness_test.cpp   |  63 ++
 .../cpp/odbc-test/src/queries_test.cpp          | 153 ++++-
 .../cpp/odbc-test/src/sql_outer_join_test.cpp   |   2 +-
 modules/platforms/cpp/odbc/Makefile.am          |   4 +-
 modules/platforms/cpp/odbc/README.txt           |  23 +-
 .../cpp/odbc/include/ignite/odbc/statement.h    |  42 ++
 .../cpp/odbc/install/ignite-odbc-amd64.wxs      | 114 ++++
 .../cpp/odbc/install/ignite-odbc-x86.wxs        | 114 ++++
 .../platforms/cpp/odbc/project/vs/odbc.vcxproj  |   4 +-
 .../odbc/src/app/application_data_buffer.cpp    |  34 +-
 modules/platforms/cpp/odbc/src/odbc.cpp         | 116 +---
 modules/platforms/cpp/odbc/src/statement.cpp    | 151 +++++
 .../Dataload/DataStreamerTestTopologyChange.cs  |  27 +-
 .../Apache.Ignite.Core/Impl/PlatformTarget.cs   |   2 +-
 .../src/test/config/incorrect-store-cache.xml   |   2 +
 .../src/test/config/jdbc-pojo-store-builtin.xml |   3 +
 .../src/test/config/jdbc-pojo-store-obj.xml     |   3 +
 modules/spring/src/test/config/node.xml         |   2 +
 modules/spring/src/test/config/node1.xml        |   2 +
 .../test/config/pojo-incorrect-store-cache.xml  |   2 +
 modules/spring/src/test/config/store-cache.xml  |   2 +
 modules/spring/src/test/config/store-cache1.xml |   2 +
 .../IgniteStartFromStreamConfigurationTest.java |  18 +-
 pom.xml                                         |   8 +
 141 files changed, 5973 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 558a654,f3751ac..ab573bd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -1475,10 -1507,25 +1478,25 @@@ class GridDhtPartitionTopologyImpl impl
          lock.readLock().lock();
  
          try {
-             Map<Integer, Long> res = new HashMap<>(cntrMap);
+             Map<Integer, Long> res;
+ 
+             if (skipZeros) {
+                 res = U.newHashMap(cntrMap.size());
+ 
+                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                     Long cntr = e.getValue();
+ 
+                     if (ZERO.equals(cntr))
+                         continue;
+ 
+                     res.put(e.getKey(), cntr);
+                 }
+             }
+             else
+                 res = new HashMap<>(cntrMap);
  
 -            for (int i = 0; i < locParts.length; i++) {
 -                GridDhtLocalPartition part = locParts[i];
 +            for (int i = 0; i < locParts.length(); i++) {
 +                GridDhtLocalPartition part = locParts.get(i);
  
                  if (part == null)
                      continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index cb83a73,d6e0743..0805be1
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@@ -86,14 -88,25 +86,27 @@@ public class HadoopClientProtocolMultip
      }
  
      /** {@inheritDoc} */
 -    @Override protected void beforeTest() throws Exception {
 -        super.beforeTest();
 +    @Override protected void afterTest() throws Exception {
 +        stopAllGrids();
  
 -        clearConnectionMap();
 +        clearClients();
 +
 +        super.afterTest();
      }
  
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     private void clearConnectionMap() throws IgniteCheckedException {
+         ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap =
+             GridTestUtils.getFieldValue(IgniteHadoopClientProtocolProvider.class, "cliMap");
+ 
+         for(IgniteInternalFuture<GridClient> fut : cliMap.values())
+             fut.get().close();
+ 
+         cliMap.clear();
+     }
+ 
      /** {@inheritDoc} */
      @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
          IgniteConfiguration cfg = super.getConfiguration(gridName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 0314b3d,7e4d5b6..ac1a6a6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -56,9 -52,7 +56,10 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
  import org.apache.ignite.internal.processors.query.GridQueryCancel;
  import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+ import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@@ -933,9 -757,12 +934,12 @@@ public class GridMapQueryExecutor 
          private int page;
  
          /** */
 -        private final int rowCount;
 +        private final int rowCnt;
  
          /** */
+         private boolean cpNeeded;
+ 
+         /** */
          private volatile boolean closed;
  
          /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index a92bf2b,072a081..80c4a08
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@@ -96,9 -95,16 +96,16 @@@ public class IgniteCacheDistributedQuer
              g.cache(null).removeAll();
      }
  
+     /** {@inheritDoc} */
+     @Override protected void afterTestsStopped() throws Exception {
+         stopAllGrids();
+ 
+         super.afterTestsStopped();
+     }
+ 
      /** */
      public void testRemoteQueryExecutionTimeout() throws Exception {
 -        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, true);
 +        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
      }
  
      /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 50f2ef0,56658df..e72c9cb
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@@ -30,12 -28,9 +30,14 @@@ import javax.cache.CacheException
  import org.apache.ignite.Ignite;
  import org.apache.ignite.IgniteCache;
  import org.apache.ignite.cache.CacheAtomicityMode;
 +import org.apache.ignite.cache.CacheKeyConfiguration;
  import org.apache.ignite.cache.CacheMode;
 +import org.apache.ignite.cache.CachePeekMode;
 +import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 +import org.apache.ignite.cache.affinity.Affinity;
 +import org.apache.ignite.cache.query.QueryCursor;
+ import org.apache.ignite.cache.affinity.Affinity;
+ import org.apache.ignite.cache.query.QueryCursor;
  import org.apache.ignite.cache.query.SqlFieldsQuery;
  import org.apache.ignite.cache.query.annotations.QuerySqlField;
  import org.apache.ignite.cluster.ClusterNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/examples/README.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --cc modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 21edf4e,a7fc7a9..db9dafb
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@@ -314,14 -294,9 +316,14 @@@ BOOST_AUTO_TEST_CASE(TestConnectionProt
      Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;PROTOCOL_VERSION=1.6.0");
  }
  
 +BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_1_8_0)
 +{
 +    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;PROTOCOL_VERSION=1.8.0");
 +}
 +
  BOOST_AUTO_TEST_CASE(TestTwoRowsInt8)
  {
-     CheckTwoRowsInt<int8_t>(SQL_C_STINYINT);
+     CheckTwoRowsInt<signed char>(SQL_C_STINYINT);
  }
  
  BOOST_AUTO_TEST_CASE(TestTwoRowsUint8)
@@@ -693,131 -674,115 +701,242 @@@ BOOST_AUTO_TEST_CASE(TestDataAtExecutio
      BOOST_CHECK(ret == SQL_NO_DATA);
  }
  
+ BOOST_AUTO_TEST_CASE(TestNullFields)
+ {
+     Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache");
+ 
+     SQLRETURN ret;
+ 
+     TestType in(1, 2, 3, 4, "5", 6.0f, 7.0, true, Guid(8, 9), BinaryUtils::MakeDateGmt(1987, 6, 5),
+         BinaryUtils::MakeTimestampGmt(1998, 12, 27, 1, 2, 3, 456));
+ 
+     TestType inNull;
+ 
+     inNull.allNulls = true;
+ 
+     testCache.Put(1, in);
+     testCache.Put(2, inNull);
+     testCache.Put(3, in);
+ 
+     const size_t columnsCnt = 10;
+ 
+     SQLLEN columnLens[columnsCnt] = { 0 };
+ 
+     int8_t i8Column;
+     int16_t i16Column;
+     int32_t i32Column;
+     int64_t i64Column;
+     char strColumn[ODBC_BUFFER_SIZE];
+     float floatColumn;
+     double doubleColumn;
+     bool boolColumn;
+     SQL_DATE_STRUCT dateColumn;
+     SQL_TIMESTAMP_STRUCT timestampColumn;
+ 
+     // Binding columns.
+     ret = SQLBindCol(stmt, 1, SQL_C_STINYINT, &i8Column, 0, &columnLens[0]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 2, SQL_C_SSHORT, &i16Column, 0, &columnLens[1]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 3, SQL_C_SLONG, &i32Column, 0, &columnLens[2]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 4, SQL_C_SBIGINT, &i64Column, 0, &columnLens[3]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 5, SQL_C_CHAR, &strColumn, ODBC_BUFFER_SIZE, &columnLens[4]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 6, SQL_C_FLOAT, &floatColumn, 0, &columnLens[5]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 7, SQL_C_DOUBLE, &doubleColumn, 0, &columnLens[6]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 8, SQL_C_BIT, &boolColumn, 0, &columnLens[7]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 9, SQL_C_DATE, &dateColumn, 0, &columnLens[8]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     ret = SQLBindCol(stmt, 10, SQL_C_TIMESTAMP, &timestampColumn, 0, &columnLens[9]);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     SQLCHAR request[] = "SELECT i8Field, i16Field, i32Field, i64Field, strField, "
+         "floatField, doubleField, boolField, dateField, timestampField FROM TestType ORDER BY _key";
+ 
+     ret = SQLExecDirect(stmt, request, SQL_NTS);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     // Fetching the first non-null row.
+     ret = SQLFetch(stmt);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     // Checking that columns are not null.
+     for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+         BOOST_CHECK_NE(columnLens[i], SQL_NULL_DATA);
+ 
+     // Fetching null row.
+     ret = SQLFetch(stmt);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     // Checking that columns are null.
+     for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+         BOOST_CHECK_EQUAL(columnLens[i], SQL_NULL_DATA);
+ 
+     // Fetching the last non-null row.
+     ret = SQLFetch(stmt);
+     if (!SQL_SUCCEEDED(ret))
+         BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ 
+     // Checking that columns are not null.
+     for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+         BOOST_CHECK_NE(columnLens[i], SQL_NULL_DATA);
+ 
+     ret = SQLFetch(stmt);
+     BOOST_CHECK(ret == SQL_NO_DATA);
+ }
+ 
++
 +BOOST_AUTO_TEST_CASE(TestDistributedJoins)
 +{
 +    // Starting additional node.
 +    Ignite node1 = StartAdditionalNode("Node1");
 +    Ignite node2 = StartAdditionalNode("Node2");
 +
 +    const int entriesNum = 1000;
 +
 +    // Filling cache with data.
 +    for (int i = 0; i < entriesNum; ++i)
 +    {
 +        TestType entry;
 +
 +        entry.i32Field = i;
 +        entry.i64Field = entriesNum - i - 1;
 +
 +        testCache.Put(i, entry);
 +    }
 +
 +    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache");
 +
 +    SQLRETURN ret;
 +
 +    const size_t columnsCnt = 2;
 +
 +    SQLBIGINT columns[columnsCnt] = { 0 };
 +
 +    // Binding colums.
 +    for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
 +    {
 +        ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
 +
 +        if (!SQL_SUCCEEDED(ret))
 +            BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +    }
 +
 +    SQLCHAR request[] =
 +        "SELECT T0.i32Field, T1.i64Field FROM TestType AS T0 "
 +        "INNER JOIN TestType AS T1 "
 +        "ON (T0.i32Field = T1.i64Field)";
 +
 +    ret = SQLExecDirect(stmt, request, SQL_NTS);
 +
 +    if (!SQL_SUCCEEDED(ret))
 +        BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +
 +    int rowsNum = CountRows(stmt);
 +
 +    BOOST_CHECK_GT(rowsNum, 0);
 +    BOOST_CHECK_LT(rowsNum, entriesNum);
 +
 +    Disconnect();
 +
 +    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;DISTRIBUTED_JOINS=true;");
 +
 +    // Binding colums.
 +    for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
 +    {
 +        ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
 +
 +        if (!SQL_SUCCEEDED(ret))
 +            BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +    }
 +
 +    ret = SQLExecDirect(stmt, request, SQL_NTS);
 +
 +    if (!SQL_SUCCEEDED(ret))
 +        BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +
 +    rowsNum = CountRows(stmt);
 +
 +    BOOST_CHECK_EQUAL(rowsNum, entriesNum);
 +}
 +
 +BOOST_AUTO_TEST_CASE(TestDistributedJoinsWithOldVersion)
 +{
 +    // Starting additional node.
 +    Ignite node1 = StartAdditionalNode("Node1");
 +    Ignite node2 = StartAdditionalNode("Node2");
 +
 +    const int entriesNum = 1000;
 +
 +    // Filling cache with data.
 +    for (int i = 0; i < entriesNum; ++i)
 +    {
 +        TestType entry;
 +
 +        entry.i32Field = i;
 +        entry.i64Field = entriesNum - i - 1;
 +
 +        testCache.Put(i, entry);
 +    }
 +
 +    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;DISTRIBUTED_JOINS=true;PROTOCOL_VERSION=1.6.0");
 +
 +    SQLRETURN ret;
 +
 +    const size_t columnsCnt = 2;
 +
 +    SQLBIGINT columns[columnsCnt] = { 0 };
 +
 +    // Binding colums.
 +    for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
 +    {
 +        ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
 +
 +        if (!SQL_SUCCEEDED(ret))
 +            BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +    }
 +
 +    SQLCHAR request[] =
 +        "SELECT T0.i32Field, T1.i64Field FROM TestType AS T0 "
 +        "INNER JOIN TestType AS T1 "
 +        "ON (T0.i32Field = T1.i64Field)";
 +
 +    ret = SQLExecDirect(stmt, request, SQL_NTS);
 +
 +    if (!SQL_SUCCEEDED(ret))
 +        BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
 +
 +    int rowsNum = CountRows(stmt);
 +
 +    BOOST_CHECK_GT(rowsNum, 0);
 +    BOOST_CHECK_LT(rowsNum, entriesNum);
 +}
 +
- 
  BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc/src/odbc.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index bafc759,f7906ff..f4a07f6
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@@ -483,82 -483,7 +483,82 @@@ namespace Apache.Ignite.Core.Imp
                  }
              }
          }
 -        
 +
 +        /// <summary>
 +        /// Perform out-in operation with a single stream.
 +        /// </summary>
 +        /// <typeparam name="TR">The type of the r.</typeparam>
 +        /// <param name="type">Operation type.</param>
 +        /// <param name="outAction">Out action.</param>
 +        /// <param name="inAction">In action.</param>
 +        /// <param name="inErrorAction">The action to read an error.</param>
 +        /// <returns>
 +        /// Result.
 +        /// </returns>
 +        protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
 +            Func<IBinaryStream, Exception> inErrorAction)
 +        {
 +            Debug.Assert(inErrorAction != null);
 +
 +            using (var stream = IgniteManager.Memory.Allocate().GetStream())
 +            {
 +                var writer = _marsh.StartMarshal(stream);
 +
 +                outAction(writer);
 +
 +                FinishMarshal(writer);
 +
 +                var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
 +
 +                if (res != Error && inAction == null)
 +                    return default(TR);  // quick path for void operations
 +
 +                stream.SynchronizeInput();
 +
 +                stream.Seek(0, SeekOrigin.Begin);
 +
 +                if (res != Error)
 +                    return inAction != null ? inAction(stream, res) : default(TR);
 +
 +                throw inErrorAction(stream);
 +            }
 +        }
 +
 +        /// <summary>
 +        /// Perform out-in operation with a single stream.
 +        /// </summary>
 +        /// <param name="type">Operation type.</param>
 +        /// <param name="outAction">Out action.</param>
 +        /// <param name="inErrorAction">The action to read an error.</param>
 +        /// <returns>
 +        /// Result.
 +        /// </returns>
-         protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction, 
++        protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
 +            Func<IBinaryStream, Exception> inErrorAction)
 +        {
 +            Debug.Assert(inErrorAction != null);
 +
 +            using (var stream = IgniteManager.Memory.Allocate().GetStream())
 +            {
 +                var writer = _marsh.StartMarshal(stream);
 +
 +                outAction(writer);
 +
 +                FinishMarshal(writer);
 +
 +                var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
 +
 +                if (res != Error)
 +                    return res == True;
 +
 +                stream.SynchronizeInput();
 +
 +                stream.Seek(0, SeekOrigin.Begin);
 +
 +                throw inErrorAction(stream);
 +            }
 +        }
 +
          /// <summary>
          /// Perform out-in operation.
          /// </summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/pom.xml
----------------------------------------------------------------------


[08/10] ignite git commit: IGNITE-4227: ODBC: Implemented SQLError. This closes #1237.

Posted by vo...@apache.org.
IGNITE-4227: ODBC: Implemented SQLError. This closes #1237.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc695f8e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc695f8e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc695f8e

Branch: refs/heads/ignite-4259
Commit: bc695f8e3306c6d74d4fe53d9a98adedd43ad8f0
Parents: f2dc1d7
Author: Igor Sapego <is...@gridgain.com>
Authored: Tue Nov 22 12:05:15 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 12:05:15 2016 +0300

----------------------------------------------------------------------
 .../cpp/odbc-test/src/api_robustness_test.cpp   | 45 +++++++++++++++
 .../platforms/cpp/odbc/include/ignite/odbc.h    | 12 +++-
 .../ignite/odbc/diagnostic/diagnosable.h        |  7 +++
 .../odbc/diagnostic/diagnosable_adapter.h       | 10 ++++
 .../ignite/odbc/diagnostic/diagnostic_record.h  | 19 +++++++
 .../odbc/diagnostic/diagnostic_record_storage.h | 16 ++++++
 .../odbc/os/win/src/system/socket_client.cpp    |  4 +-
 .../odbc/src/diagnostic/diagnostic_record.cpp   | 16 +++++-
 .../diagnostic/diagnostic_record_storage.cpp    | 18 ++++++
 modules/platforms/cpp/odbc/src/entry_points.cpp | 26 ++++-----
 modules/platforms/cpp/odbc/src/odbc.cpp         | 59 ++++++++++++++++++++
 11 files changed, 214 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp b/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
index fbd5f12..13a5ea6 100644
--- a/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
@@ -1066,4 +1066,49 @@ BOOST_AUTO_TEST_CASE(TestFetchScrollFirst)
     CheckFetchScrollUnsupportedOrientation(SQL_FETCH_FIRST);
 }
 
+BOOST_AUTO_TEST_CASE(TestSQLError)
+{
+    // There are no checks because we do not really care what is the result of these
+    // calls as long as they do not cause segmentation fault.
+
+    Connect("DRIVER={Apache Ignite};address=127.0.0.1:11110;cache=cache");
+
+    SQLCHAR state[6] = { 0 };
+    SQLINTEGER nativeCode = 0;
+    SQLCHAR message[ODBC_BUFFER_SIZE] = { 0 };
+    SQLSMALLINT messageLen = 0;
+
+    // Everything is ok.
+    SQLRETURN ret = SQLError(env, 0, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+    if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+        BOOST_FAIL("Unexpected error");
+
+    ret = SQLError(0, dbc, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+    if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+        BOOST_FAIL("Unexpected error");
+
+    ret = SQLError(0, 0, stmt, state, &nativeCode, message, sizeof(message), &messageLen);
+
+    if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+        BOOST_FAIL("Unexpected error");
+
+    SQLError(0, 0, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+    SQLError(0, 0, stmt, 0, &nativeCode, message, sizeof(message), &messageLen);
+
+    SQLError(0, 0, stmt, state, 0, message, sizeof(message), &messageLen);
+
+    SQLError(0, 0, stmt, state, &nativeCode, 0, sizeof(message), &messageLen);
+
+    SQLError(0, 0, stmt, state, &nativeCode, message, 0, &messageLen);
+
+    SQLError(0, 0, stmt, state, &nativeCode, message, sizeof(message), 0);
+
+    SQLError(0, 0, stmt, 0, 0, 0, 0, 0);
+
+    SQLError(0, 0, 0, 0, 0, 0, 0, 0);
+}
+
 BOOST_AUTO_TEST_SUITE_END()

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc.h b/modules/platforms/cpp/odbc/include/ignite/odbc.h
index ec0861c..345cdb8 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc.h
@@ -255,6 +255,16 @@ namespace ignite
     SQLRETURN SQLParamData(SQLHSTMT stmt, SQLPOINTER* value);
 
     SQLRETURN SQLPutData(SQLHSTMT stmt, SQLPOINTER data, SQLLEN strLengthOrIndicator);
+
+    SQLRETURN SQLError(SQLHENV      env,
+                       SQLHDBC      conn,
+                       SQLHSTMT     stmt,
+                       SQLCHAR*     state,
+                       SQLINTEGER*  error,
+                       SQLCHAR*     msgBuf,
+                       SQLSMALLINT  msgBufLen,
+                       SQLSMALLINT* msgResLen);
+
 } // namespace ignite
 
-#endif //_IGNITE_ODBC
+#endif //_IGNITE_ODBC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
index 909fe01..6937fcc 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
@@ -48,6 +48,13 @@ namespace ignite
                 virtual const diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords() const = 0;
 
                 /**
+                 * Get diagnostic record.
+                 *
+                 * @return Diagnostic record.
+                 */
+                virtual diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords() = 0;
+
+                /**
                  * Add new status record.
                  *
                  * @param sqlState SQL state.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
index 63d26ca..548eecd 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
@@ -62,6 +62,16 @@ namespace ignite
                 }
 
                 /**
+                 * Get diagnostic record.
+                 *
+                 * @return Diagnostic record.
+                 */
+                virtual diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords()
+                {
+                    return diagnosticRecords;
+                }
+
+                /**
                  * Add new status record.
                  *
                  * @param sqlState SQL state.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
index 670e0aa..80d5090 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
@@ -127,6 +127,19 @@ namespace ignite
                  */
                 int32_t GetColumnNumber() const;
 
+                /**
+                 * Check if the record was retrieved with the SQLError previously.
+                 *
+                 * return True if the record was retrieved with the SQLError
+                 *  previously.
+                 */
+                bool IsRetrieved() const;
+
+                /**
+                 * Mark record as retrieved with the SQLError.
+                 */
+                void MarkRetrieved();
+
             private:
                 /** SQL state diagnostic code. */
                 SqlState sqlState;
@@ -157,6 +170,12 @@ namespace ignite
                  * result set or the parameter number in the set of parameters.
                  */
                 int32_t columnNum;
+
+                /**
+                 * Flag that shows if the record was retrieved with the 
+                 * SQLError previously.
+                 */
+                bool retrieved;
             };
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
index 4cc3576..b45bb7d 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
@@ -139,6 +139,22 @@ namespace ignite
                 const DiagnosticRecord& GetStatusRecord(int32_t idx) const;
 
                 /**
+                 * Get specified status record.
+                 *
+                 * @param idx Status record index.
+                 * @return Status record instance reference.
+                 */
+                DiagnosticRecord& GetStatusRecord(int32_t idx);
+
+                /**
+                 * Get last non-retrieved status record index.
+                 *
+                 * @return Index of the last non-retrieved status record or zero
+                 *  if nothing was found.
+                 */
+                int32_t GetLastNonRetrieved() const;
+
+                /**
                  * Check if the record is in the success state.
                  *
                  * @return True if the record is in the success state.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
index bc4cdc0..e248323 100644
--- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
@@ -83,8 +83,8 @@ namespace ignite
                 // Attempt to connect to an address until one succeeds
                 for (addrinfo *it = result; it != NULL; it = it->ai_next)
                 {
-                    LOG_MSG("Addr: %u.%u.%u.%u\n", it->ai_addr->sa_data[2], it->ai_addr->sa_data[3],
-                                                   it->ai_addr->sa_data[4], it->ai_addr->sa_data[5]);
+                    LOG_MSG("Addr: %u.%u.%u.%u\n", it->ai_addr->sa_data[2] & 0xFF, it->ai_addr->sa_data[3] & 0xFF,
+                                                   it->ai_addr->sa_data[4] & 0xFF, it->ai_addr->sa_data[5] & 0xFF);
 
                     // Create a SOCKET for connecting to server
                     socketHandle = socket(it->ai_family, it->ai_socktype, it->ai_protocol);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
index 1b654d2..215d77f 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
@@ -89,7 +89,8 @@ namespace ignite
                 connectionName(),
                 serverName(),
                 rowNum(0),
-                columnNum(0)
+                columnNum(0),
+                retrieved(false)
             {
                 // No-op.
             }
@@ -102,7 +103,8 @@ namespace ignite
                 connectionName(connectionName),
                 serverName(serverName),
                 rowNum(rowNum),
-                columnNum(columnNum)
+                columnNum(columnNum),
+                retrieved(false)
             {
                 // No-op.
             }
@@ -260,6 +262,16 @@ namespace ignite
             {
                 return columnNum;
             }
+
+            bool DiagnosticRecord::IsRetrieved() const
+            {
+                return retrieved;
+            }
+
+            void DiagnosticRecord::MarkRetrieved()
+            {
+                retrieved = true;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
index 99ef292..c075567 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
@@ -102,6 +102,24 @@ namespace ignite
                 return statusRecords[idx - 1];
             }
 
+            DiagnosticRecord& DiagnosticRecordStorage::GetStatusRecord(int32_t idx)
+            {
+                return statusRecords[idx - 1];
+            }
+
+            int32_t DiagnosticRecordStorage::GetLastNonRetrieved() const
+            {
+                for (size_t i = 0; i < statusRecords.size(); ++i)
+                {
+                    const DiagnosticRecord& record = statusRecords[i];
+
+                    if (!record.IsRetrieved())
+                        return static_cast<int32_t>(i + 1);
+                }
+
+                return 0;
+            }
+
             bool DiagnosticRecordStorage::IsSuccessful() const
             {
                 return result == SQL_RESULT_SUCCESS || 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/entry_points.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/entry_points.cpp b/modules/platforms/cpp/odbc/src/entry_points.cpp
index 08016cc..a85b3cf 100644
--- a/modules/platforms/cpp/odbc/src/entry_points.cpp
+++ b/modules/platforms/cpp/odbc/src/entry_points.cpp
@@ -412,6 +412,19 @@ SQLRETURN SQL_API SQLPutData(SQLHSTMT     stmt,
     return ignite::SQLPutData(stmt, data, strLengthOrIndicator);
 }
 
+SQLRETURN SQL_API SQLError(SQLHENV      env,
+                           SQLHDBC      conn,
+                           SQLHSTMT     stmt,
+                           SQLCHAR*     state,
+                           SQLINTEGER*  error,
+                           SQLCHAR*     msgBuf,
+                           SQLSMALLINT  msgBufLen,
+                           SQLSMALLINT* msgResLen)
+{
+    return ignite::SQLError(env, conn, stmt, state,
+        error, msgBuf, msgBufLen, msgResLen);
+}
+
 //
 // ==== Not implemented ====
 //
@@ -434,19 +447,6 @@ SQLRETURN SQL_API SQLColAttributes(SQLHSTMT     stmt,
     return SQL_SUCCESS;
 }
 
-SQLRETURN SQL_API SQLError(SQLHENV      env,
-                           SQLHDBC      conn,
-                           SQLHSTMT     stmt,
-                           SQLCHAR*     state,
-                           SQLINTEGER*  error,
-                           SQLCHAR*     msgBuf,
-                           SQLSMALLINT  msgBufLen,
-                           SQLSMALLINT* msgResLen)
-{
-    LOG_MSG("SQLError called\n");
-    return SQL_ERROR;
-}
-
 SQLRETURN SQL_API SQLGetCursorName(SQLHSTMT     stmt,
                                    SQLCHAR*     nameBuf,
                                    SQLSMALLINT  nameBufLen,

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/odbc.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/odbc.cpp b/modules/platforms/cpp/odbc/src/odbc.cpp
index 79036eb..684ed08 100644
--- a/modules/platforms/cpp/odbc/src/odbc.cpp
+++ b/modules/platforms/cpp/odbc/src/odbc.cpp
@@ -1255,4 +1255,63 @@ namespace ignite
         return statement->GetDiagnosticRecords().GetReturnCode();
     }
 
+    SQLRETURN SQLError(SQLHENV      env,
+                       SQLHDBC      conn,
+                       SQLHSTMT     stmt,
+                       SQLCHAR*     state,
+                       SQLINTEGER*  error,
+                       SQLCHAR*     msgBuf,
+                       SQLSMALLINT  msgBufLen,
+                       SQLSMALLINT* msgResLen)
+    {
+        using namespace ignite::utility;
+        using namespace ignite::odbc;
+        using namespace ignite::odbc::diagnostic;
+        using namespace ignite::odbc::type_traits;
+
+        using ignite::odbc::app::ApplicationDataBuffer;
+
+        LOG_MSG("SQLError called\n");
+
+        SQLHANDLE handle = 0;
+
+        if (env != 0)
+            handle = static_cast<SQLHANDLE>(env);
+        else if (conn != 0)
+            handle = static_cast<SQLHANDLE>(conn);
+        else if (stmt != 0)
+            handle = static_cast<SQLHANDLE>(stmt);
+        else
+            return SQL_INVALID_HANDLE;
+
+        Diagnosable *diag = reinterpret_cast<Diagnosable*>(handle);
+
+        DiagnosticRecordStorage& records = diag->GetDiagnosticRecords();
+
+        int32_t recNum = records.GetLastNonRetrieved();
+
+        if (recNum < 1 || recNum > records.GetStatusRecordsNumber())
+            return SQL_NO_DATA;
+
+        DiagnosticRecord& record = records.GetStatusRecord(recNum);
+
+        record.MarkRetrieved();
+
+        if (state)
+            CopyStringToBuffer(record.GetSqlState(), reinterpret_cast<char*>(state), 6);
+
+        if (error)
+            *error = 0;
+
+        SqlLen outResLen;
+        ApplicationDataBuffer outBuffer(IGNITE_ODBC_C_TYPE_CHAR, msgBuf, msgBufLen, &outResLen);
+
+        outBuffer.PutString(record.GetMessageText());
+
+        if (msgResLen)
+            *msgResLen = static_cast<SQLSMALLINT>(outResLen);
+
+        return SQL_SUCCESS;
+    }
+
 } // namespace ignite;


[02/10] ignite git commit: IGNITE-2355: fix test - clear client connections before and after a test.

Posted by vo...@apache.org.
IGNITE-2355: fix test - clear client connections before and after a test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e4a279e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e4a279e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e4a279e

Branch: refs/heads/ignite-4259
Commit: 6e4a279e34584881469a7d841432e6c38db2f06f
Parents: 88f38ac
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Nov 21 19:15:17 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Mon Nov 21 19:15:17 2016 +0500

----------------------------------------------------------------------
 ...opClientProtocolMultipleServersSelfTest.java | 24 ++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e4a279e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index 0e51938..cb83a73 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@ -23,6 +23,7 @@ import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,6 +45,8 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
 import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.client.GridServerUnreachableException;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
@@ -76,9 +79,18 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        clearClients();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
 
+        clearClients();
+
         super.afterTest();
     }
 
@@ -92,6 +104,18 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
     }
 
     /**
+     *
+     */
+    private void clearClients() {
+        ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = GridTestUtils.getFieldValue(
+            IgniteHadoopClientProtocolProvider.class,
+            IgniteHadoopClientProtocolProvider.class,
+            "cliMap");
+
+        cliMap.clear();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void beforeJob() throws Exception {


[07/10] ignite git commit: IGNITE-3748 Data rebalancing of large cache can hang out.

Posted by vo...@apache.org.
IGNITE-3748 Data rebalancing of large cache can hang out.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f80bfbd1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f80bfbd1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f80bfbd1

Branch: refs/heads/ignite-4259
Commit: f80bfbd19e7870554bf3abd13bde89b0f39aaee1
Parents: d15eba4
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 12:02:57 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 12:02:57 2016 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPreloader.java         | 18 ++++++++++++-
 .../ignite/internal/util/GridLogThrottle.java   | 28 +++++++++++++++-----
 2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f80bfbd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 6ed58e7..0865d9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -788,7 +789,22 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                             GridDhtLocalPartition part = partsToEvict.poll();
 
                             if (part != null)
-                                part.tryEvict();
+                                try {
+                                    part.tryEvict();
+                                }
+                                catch (Throwable ex) {
+                                    if (cctx.kernalContext().isStopping()) {
+                                        LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
+                                            false,
+                                            true);
+
+                                        partsToEvict.clear();
+
+                                        return true;
+                                    }
+                                    else
+                                        LT.error(log, ex, "Partition eviction failed, this can cause grid hang.");
+                                }
                         }
                         finally {
                             if (!partsToEvict.isEmptyx())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f80bfbd1/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
index ce6783a..c8ba865 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
@@ -81,12 +81,12 @@ public class GridLogThrottle {
      * @param log Logger.
      * @param e Error (optional).
      * @param msg Message.
-     * @param byMessage Errors group by message, not by tuple(error, msg).
+     * @param byMsg Errors group by message, not by tuple(error, msg).
      */
-    public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMessage) {
+    public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMsg) {
         assert !F.isEmpty(msg);
 
-        log(log, e, msg, null, LogLevel.ERROR, false, byMessage);
+        log(log, e, msg, null, LogLevel.ERROR, false, byMsg);
     }
 
     /**
@@ -109,6 +109,22 @@ public class GridLogThrottle {
      * @param e Error (optional).
      * @param msg Message.
      * @param quite Print warning anyway.
+     * @param byMsg Errors group by message, not by tuple(error, msg).
+     */
+    public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite, boolean byMsg) {
+        assert !F.isEmpty(msg);
+
+        log(log, e, msg, null, LogLevel.WARN, quite, byMsg);
+    }
+
+
+    /**
+     * Logs warning if needed.
+     *
+     * @param log Logger.
+     * @param e Error (optional).
+     * @param msg Message.
+     * @param quite Print warning anyway.
      */
     public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite) {
         assert !F.isEmpty(msg);
@@ -168,15 +184,15 @@ public class GridLogThrottle {
      * @param longMsg Long message (or just message).
      * @param shortMsg Short message for quite logging.
      * @param level Level where messages should appear.
-     * @param byMessage Errors group by message, not by tuple(error, msg).
+     * @param byMsg Errors group by message, not by tuple(error, msg).
      */
     @SuppressWarnings({"RedundantTypeArguments"})
     private static void log(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg,
-        LogLevel level, boolean quiet, boolean byMessage) {
+        LogLevel level, boolean quiet, boolean byMsg) {
         assert !F.isEmpty(longMsg);
 
         IgniteBiTuple<Class<? extends Throwable>, String> tup =
-            e != null && !byMessage ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
+            e != null && !byMsg ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
                 F.<Class<? extends Throwable>, String>t(null, longMsg);
 
         while (true) {