You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/13 10:27:20 UTC

[4/8] ignite git commit: IGNITE-6024: SQL: Implemented "skipReducerOnUpdate" flag. This closes #2488.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
new file mode 100644
index 0000000..e40bc2d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
+
+/**
+ * Request for DML operation on remote node.
+ */
+public class GridH2DmlRequest implements Message, GridCacheQueryMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Request id. */
+    @GridToStringInclude
+    private long reqId;
+
+    /** Cache identifiers. */
+    @GridToStringInclude
+    @GridDirectCollection(Integer.class)
+    private List<Integer> caches;
+
+    /** Topology version. */
+    @GridToStringInclude
+    private AffinityTopologyVersion topVer;
+
+    /** Query partitions. */
+    @GridToStringInclude
+    private int[] qryParts;
+
+    /** Page size. */
+    private int pageSize;
+
+    /** Query. */
+    @GridToStringInclude
+    private String qry;
+
+    /** Flags. */
+    private byte flags;
+
+    /** Timeout. */
+    private int timeout;
+
+    /** Query parameters. */
+    @GridToStringInclude(sensitive = true)
+    @GridDirectTransient
+    private Object[] params;
+
+    /** Query parameters as bytes. */
+    private byte[] paramsBytes;
+
+    /** Schema name. */
+    @GridToStringInclude
+    private String schemaName;
+
+    /**
+     * Required by {@link Externalizable}
+     */
+    public GridH2DmlRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param req Request.
+     */
+    public GridH2DmlRequest(GridH2DmlRequest req) {
+        reqId = req.reqId;
+        caches = req.caches;
+        topVer = req.topVer;
+        qryParts = req.qryParts;
+        pageSize = req.pageSize;
+        qry = req.qry;
+        flags = req.flags;
+        timeout = req.timeout;
+        params = req.params;
+        paramsBytes = req.paramsBytes;
+        schemaName = req.schemaName;
+    }
+
+    /**
+     * @return Parameters.
+     */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /**
+     * @param params Parameters.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest parameters(Object[] params) {
+        if (params == null)
+            params = EMPTY_PARAMS;
+
+        this.params = params;
+
+        return this;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest requestId(long reqId) {
+        this.reqId = reqId;
+
+        return this;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @param caches Caches.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest caches(List<Integer> caches) {
+        this.caches = caches;
+
+        return this;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+
+        return this;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Query partitions.
+     */
+    public int[] queryPartitions() {
+        return qryParts;
+    }
+
+    /**
+     * @param qryParts Query partitions.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest queryPartitions(int[] qryParts) {
+        this.qryParts = qryParts;
+
+        return this;
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest pageSize(int pageSize) {
+        this.pageSize = pageSize;
+
+        return this;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @param qry SQL Query.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest query(String qry) {
+        this.qry = qry;
+
+        return this;
+    }
+
+    /**
+     * @return SQL Query.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @param flags Flags.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest flags(int flags) {
+        assert flags >= 0 && flags <= 255: flags;
+
+        this.flags = (byte)flags;
+
+        return this;
+    }
+
+    /**
+     * @param flags Flags to check.
+     * @return {@code true} If all the requested flags are set to {@code true}.
+     */
+    public boolean isFlagSet(int flags) {
+        return (this.flags & flags) == flags;
+    }
+
+    /**
+     * @return Timeout.
+     */
+    public int timeout() {
+        return timeout;
+    }
+
+    /**
+     * @param timeout New timeout.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest timeout(int timeout) {
+        this.timeout = timeout;
+
+        return this;
+    }
+
+    /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @param schemaName Schema name.
+     * @return {@code this}.
+     */
+    public GridH2DmlRequest schemaName(String schemaName) {
+        this.schemaName = schemaName;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void marshall(Marshaller m) {
+        if (paramsBytes != null)
+            return;
+
+        assert params != null;
+
+        try {
+            paramsBytes = U.marshal(m, params);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("IfMayBeConditional")
+    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+        if (params != null)
+            return;
+
+        assert paramsBytes != null;
+
+        try {
+            final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+            if (m instanceof BinaryMarshaller)
+                // To avoid deserializing of enum types.
+                params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+            else
+                params = U.unmarshal(m, paramsBytes, ldr);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("pageSize", pageSize))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeString("qry", qry))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeIntArray("qryParts", qryParts))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeString("schemaName", schemaName))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                caches = reader.readCollection("caches", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                pageSize = reader.readInt("pageSize");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                paramsBytes = reader.readByteArray("paramsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                qry = reader.readString("qry");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                qryParts = reader.readIntArray("qryParts");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                schemaName = reader.readString("schemaName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                timeout = reader.readInt("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2DmlRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -55;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2DmlRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
new file mode 100644
index 0000000..808ff9e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Response to remote DML request.
+ */
+public class GridH2DmlResponse implements Message, GridCacheQueryMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Request id. */
+    @GridToStringInclude
+    private long reqId;
+
+    /** Number of updated rows. */
+    @GridToStringInclude
+    private long updCnt;
+
+    /** Error message. */
+    @GridToStringInclude
+    private String err;
+
+    /** Keys that failed. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Object[] errKeys;
+
+    /** Keys that failed (after marshalling). */
+    private byte[] errKeysBytes;
+
+    /**
+     * Default constructor.
+     */
+    public GridH2DmlResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param reqId Request id.
+     * @param updCnt Updated row number.
+     * @param errKeys Erroneous keys.
+     * @param error Error message.
+     */
+    public GridH2DmlResponse(long reqId, long updCnt, Object[] errKeys, String error) {
+        this.reqId = reqId;
+        this.updCnt = updCnt;
+        this.errKeys = errKeys;
+        this.err = error;
+    }
+
+    /**
+     * @return Request id.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Update counter.
+     */
+    public long updateCounter() {
+        return updCnt;
+    }
+
+    /**
+     * @return Error keys.
+     */
+    public Object[] errorKeys() {
+        return errKeys;
+    }
+
+    /**
+     * @return Error message.
+     */
+    public String error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void marshall(Marshaller m) {
+        if (errKeysBytes != null || errKeys == null)
+            return;
+
+        try {
+            errKeysBytes = U.marshal(m, errKeys);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("IfMayBeConditional")
+    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+        if (errKeys != null || errKeysBytes == null)
+            return;
+
+        try {
+            final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+            if (m instanceof BinaryMarshaller)
+                // To avoid deserializing of enum types.
+                errKeys = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(errKeysBytes, ldr);
+            else
+                errKeys = U.unmarshal(m, errKeysBytes, ldr);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2DmlResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("err", err))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("errKeysBytes", errKeysBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("updCnt", updCnt))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                err = reader.readString("err");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errKeysBytes = reader.readByteArray("errKeysBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                updCnt = reader.readLong("updCnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2DmlResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -56;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    @Override public void onAckReceived() {
+        // No-op
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 18b1afb..3c13392 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -112,6 +112,12 @@ public class GridH2ValueMessageFactory implements MessageFactory {
 
             case -54:
                 return new QueryTable();
+
+            case -55:
+                return new GridH2DmlRequest();
+
+            case -56:
+                return new GridH2DmlResponse();
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
new file mode 100644
index 0000000..e5efc06
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
@@ -0,0 +1,783 @@
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link SqlFieldsQueryEx#skipReducerOnUpdate} flag.
+ */
+public class IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int NODE_COUNT = 4;
+
+    /** */
+    private static String NODE_CLIENT = "client";
+
+    /** */
+    private static String CACHE_ACCOUNT = "acc";
+
+    /** */
+    private static String CACHE_REPORT = "rep";
+
+    /** */
+    private static String CACHE_STOCK = "stock";
+
+    /** */
+    private static String CACHE_TRADE = "trade";
+
+    /** */
+    private static String CACHE_LIST = "list";
+
+    /** */
+    private static IgniteEx client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        ccfgs.add(buildCacheConfiguration(CACHE_ACCOUNT));
+        ccfgs.add(buildCacheConfiguration(CACHE_STOCK));
+        ccfgs.add(buildCacheConfiguration(CACHE_TRADE));
+        ccfgs.add(buildCacheConfiguration(CACHE_REPORT));
+        ccfgs.add(buildCacheConfiguration(CACHE_LIST));
+
+        c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        if (gridName.equals(NODE_CLIENT))
+            c.setClientMode(true);
+
+        return c;
+    }
+
+    /**
+     * Creates a cache configuration.
+     *
+     * @param name Name of the cache.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration buildCacheConfiguration(String name) {
+        if (name.equals(CACHE_ACCOUNT)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_ACCOUNT);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Account.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_STOCK)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_STOCK);
+
+            ccfg.setCacheMode(CacheMode.REPLICATED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Stock.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_TRADE)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_TRADE);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Trade.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_REPORT)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_REPORT);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Report.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_LIST)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_LIST);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, String.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            return ccfg;
+        }
+
+        assert false;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODE_COUNT);
+
+        client = (IgniteEx)startGrid(NODE_CLIENT);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        awaitPartitionMapExchange();
+
+        client.cache(CACHE_ACCOUNT).clear();
+        client.cache(CACHE_STOCK).clear();
+        client.cache(CACHE_TRADE).clear();
+        client.cache(CACHE_REPORT).clear();
+        client.cache(CACHE_LIST).clear();
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdate() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+        String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE depo > 0";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, new SqlFieldsQueryEx(text, false).setArgs(10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdateFastKey() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+        String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE _key = ?";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false).setArgs(10, 1));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdateLimit() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+        String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE sn >= ? AND sn < ? LIMIT ?";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false).setArgs(10, 0, 10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdateWhereSubquery() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, -100);
+
+        Map<Integer, Trade> trades = getTrades(100, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "UPDATE \"trade\".Trade t SET qty = ? " +
+            "WHERE accountId IN (SELECT p._key FROM \"acc\".Account p WHERE depo < ?)";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+            new SqlFieldsQueryEx(text, false).setArgs(0, 0));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdateSetSubquery() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+        Map<Integer, Trade> trades = getTrades(100, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "UPDATE \"trade\".Trade t SET qty = " +
+            "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdateSetTableSubquery() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+        Map<Integer, Trade> trades = getTrades(100, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "UPDATE \"trade\".Trade t SET (qty) = " +
+            "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertValues() throws Exception {
+        String text = "INSERT INTO \"acc\".Account (_key, name, sn, depo)" +
+            " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), null,
+            new SqlFieldsQueryEx(text, false).setArgs(1, "John Marry", 11111, 100, 2, "Marry John", 11112, 200));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelect() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+            "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+            new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelectOrderBy() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+            "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+            "ORDER BY a.sn DESC";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+            new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelectUnion() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(20, 1, 1000);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+            "SELECT a._key, a._key, 0, a.depo, 1 FROM \"acc\".Account a " +
+            "UNION " +
+            "SELECT 101 + a2._key, a2._key, 1, a2.depo, 1 FROM \"acc\".Account a2";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelectGroupBy() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+        Map<Integer, Trade> trades = getTrades(100, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+        client.cache(CACHE_TRADE).putAll(trades);
+
+        String text = "INSERT INTO \"rep\".Report (_key, accountId, spends, count) " +
+            "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+            "FROM \"trade\".Trade " +
+            "GROUP BY accountId";
+
+        checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), null,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelectDistinct() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 2, 100);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "INSERT INTO \"list\".String (_key, _val) " +
+            "SELECT DISTINCT sn, name FROM \"acc\".Account ";
+
+        checkUpdate(client.<Integer, String>cache(CACHE_LIST), null,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsertFromSelectJoin() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+        Map<Integer, Stock> stocks = getStocks(5);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+        client.cache(CACHE_STOCK).putAll(stocks);
+
+        String text = "INSERT INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+            "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+            "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+            new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testDelete() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "DELETE FROM \"acc\".Account WHERE sn > ?";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false).setArgs(10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testDeleteTop() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        String text = "DELETE TOP ? FROM \"acc\".Account WHERE sn < ?";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testDeleteWhereSubquery() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(20, 1, 100);
+        Map<Integer, Trade> trades = getTrades(10, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+        client.cache(CACHE_TRADE).putAll(trades);
+
+        String text = "DELETE FROM \"acc\".Account " +
+            "WHERE _key IN (SELECT t.accountId FROM \"trade\".Trade t)";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testMergeValues() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(1, 1, 100);
+
+        String text = "MERGE INTO \"acc\".Account (_key, name, sn, depo)" +
+            " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+        checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+            new SqlFieldsQueryEx(text, false).setArgs(0, "John Marry", 11111, 100, 1, "Marry John", 11112, 200));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testMergeFromSelectJoin() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+        Map<Integer, Stock> stocks = getStocks(5);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+        client.cache(CACHE_STOCK).putAll(stocks);
+
+        Map<Integer, Trade> trades = new HashMap<>();
+
+        trades.put(5, new Trade(1, 1, 1, 1));
+
+        String text = "MERGE INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+            "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+            "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+            new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testMergeFromSelectOrderBy() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+        Map<Integer, Trade> trades = new HashMap<>();
+
+        trades.put(5, new Trade(1, 1, 1, 1));
+
+        String text = "MERGE INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+            "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+            "ORDER BY a.sn DESC";
+
+        checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+            new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testMergeFromSelectGroupBy() throws Exception {
+        Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+        Map<Integer, Trade> trades = getTrades(100, 2);
+
+        client.cache(CACHE_ACCOUNT).putAll(accounts);
+        client.cache(CACHE_TRADE).putAll(trades);
+
+        Map<Integer, Report> reports = new HashMap<>();
+
+        reports.put(5, new Report(5, 1, 1));
+
+        String text = "MERGE INTO \"rep\".Report (_key, accountId, spends, count) " +
+            "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+            "FROM \"trade\".Trade " +
+            "GROUP BY accountId";
+
+        checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), reports,
+            new SqlFieldsQueryEx(text, false));
+    }
+
+    /**
+     * Constructs multiple Account objects.
+     *
+     * @param num Number of accounts.
+     * @param numCopy Number of copies.
+     * @param depo Deposit amount.
+     * @return Map of accounts.
+     */
+    private Map<Integer, Account> getAccounts(int num, int numCopy, int depo) {
+        Map<Integer, Account> res = new HashMap<>();
+
+        int count = 0;
+
+        for (int i = 0; i < num; ++i) {
+            String name = "John doe #" + i;
+
+            for (int j = 0; j < numCopy; ++j)
+                res.put(count++, new Account(name, i, depo));
+        }
+
+        return res;
+    }
+
+    /**
+     * Constructs multiple Stock objects.
+     *
+     * @param num Number of stocks.
+     * @return Map of Stock objects.
+     */
+    private Map<Integer, Stock> getStocks(int num) {
+        Map<Integer, Stock> res = new HashMap<>();
+
+        for (int i = 0; i < num; ++i)
+            res.put(i, new Stock("T" + i, "Stock #" + i));
+
+        return res;
+    }
+
+    /**
+     * Constructs multiple Trade objects.
+     *
+     * @param numAccounts Number of accounts.
+     * @param numStocks Number of stocks.
+     * @return Map of Trade objects.
+     */
+    private Map<Integer, Trade> getTrades(int numAccounts, int numStocks) {
+        Map<Integer, Trade> res = new HashMap<>();
+
+        int count = 0;
+
+        for (int i = 0; i < numAccounts; ++i) {
+            for (int j = 0; j < numStocks; ++j) {
+                res.put(count++, new Trade(i, j, 100, 100));
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Executes provided sql update with skipReducerOnUpdate flag on and off and checks results are the same.
+     *
+     * @param cache Cache.
+     * @param initial Initial content of the cache.
+     * @param qry Query to execute.
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private <K, V> void checkUpdate(IgniteCache<K, V> cache, Map<K, V> initial, SqlFieldsQueryEx qry) {
+        cache.clear();
+
+        if (!F.isEmpty(initial))
+            cache.putAll(initial);
+
+        List<List<?>> updRes = cache.query(qry.setSkipReducerOnUpdate(true)).getAll();
+
+        Map<K, V> result = new HashMap<>(cache.size());
+
+        for (Cache.Entry<K, V> e : cache)
+            result.put(e.getKey(), e.getValue());
+
+        cache.clear();
+
+        if (!F.isEmpty(initial))
+            cache.putAll(initial);
+
+        List<List<?>> updRes2 = cache.query(qry.setSkipReducerOnUpdate(false)).getAll();
+
+        assertTrue(((Number)updRes.get(0).get(0)).intValue() > 0);
+
+        assertEquals(((Number)updRes.get(0).get(0)).intValue(), ((Number)updRes2.get(0).get(0)).intValue());
+
+        assertEquals(result.size(), cache.size());
+
+        for (Cache.Entry<K, V> e : cache)
+            assertEquals(e.getValue(), result.get(e.getKey()));
+    }
+
+    /** */
+    public class Account {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /** */
+        @QuerySqlField
+        int sn;
+
+        /** */
+        @QuerySqlField
+        int depo;
+
+        /**
+         * Constructor.
+         *
+         * @param name Name.
+         * @param sn ID.
+         * @param depo Deposit amount.
+         */
+        Account(String name, int sn, int depo) {
+            this.name = name;
+            this.sn = sn;
+            this.depo = depo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (name == null ? 0 : name.hashCode()) ^ sn ^ depo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == null)
+                return false;
+
+            if (!obj.getClass().equals(Account.class))
+                return false;
+
+            Account other = (Account)obj;
+
+            return F.eq(name, other.name) && sn == other.sn && depo == other.depo;
+        }
+    }
+
+    /** */
+    public class Stock {
+        /** */
+        @QuerySqlField
+        String ticker;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * Constructor.
+         *
+         * @param ticker Short name.
+         * @param name Name.
+         */
+        Stock(String ticker, String name) {
+            this.ticker = ticker;
+            this.name = name;
+        }
+    }
+
+    /** */
+    public class Trade {
+        /** */
+        @QuerySqlField
+        int accountId;
+
+        /** */
+        @QuerySqlField
+        int stockId;
+
+        /** */
+        @QuerySqlField
+        int qty;
+
+        /** */
+        @QuerySqlField
+        int price;
+
+        /**
+         * Constructor.
+         *
+         * @param accountId Account id.
+         * @param stockId Stock id.
+         * @param qty Quantity.
+         * @param price Price.
+         */
+        Trade(int accountId, int stockId, int qty, int price) {
+            this.accountId = accountId;
+            this.stockId = stockId;
+            this.qty = qty;
+            this.price = price;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return accountId ^ stockId ^ qty ^ price;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == null)
+                return false;
+
+            if (!obj.getClass().equals(Trade.class))
+                return false;
+
+            Trade other = (Trade)obj;
+
+            return accountId == other.accountId && stockId == other.stockId &&
+                qty == other.qty && price == other.price;
+        }
+
+    }
+
+    /** */
+    public class Report {
+        /** */
+        @QuerySqlField
+        int accountId;
+
+        /** */
+        @QuerySqlField
+        int spends;
+
+        /** */
+        @QuerySqlField
+        int count;
+
+        /**
+         * Constructor.
+         *
+         * @param accountId Account id.
+         * @param spends Spends.
+         * @param count Count.
+         */
+        Report(int accountId, int spends, int count) {
+            this.accountId = accountId;
+            this.spends = spends;
+            this.count = count;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return accountId ^ spends ^ count;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == null)
+                return false;
+
+            if (!obj.getClass().equals(Report.class))
+                return false;
+
+            Report other = (Report)obj;
+
+            return accountId == other.accountId && spends == other.spends &&
+                count == other.count;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
new file mode 100644
index 0000000..a2a6bf8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+
+/**
+ * Tests for distributed DML.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+public class IgniteSqlSkipReducerOnUpdateDmlSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int NODE_COUNT = 4;
+
+    /** */
+    private static String NODE_CLIENT = "client";
+
+    /** */
+    private static String CACHE_ORG = "org";
+
+    /** */
+    private static String CACHE_PERSON = "person";
+
+    /** */
+    private static String CACHE_POSITION = "pos";
+
+    /** */
+    private static Ignite client;
+
+    /** */
+    private static CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        ccfgs.add(buildCacheConfiguration(CACHE_ORG));
+        ccfgs.add(buildCacheConfiguration(CACHE_PERSON));
+        ccfgs.add(buildCacheConfiguration(CACHE_POSITION));
+
+        c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        c.setLongQueryWarningTimeout(10000);
+
+        if (gridName.equals(NODE_CLIENT))
+            c.setClientMode(true);
+
+        return c;
+    }
+
+    /**
+     * Creates cache configuration.
+     *
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration buildCacheConfiguration(String name) {
+        if (name.equals(CACHE_ORG)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_ORG);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Organization.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_PERSON)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(PersonKey.class, Person.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            ccfg.setKeyConfiguration(new CacheKeyConfiguration(PersonKey.class));
+
+            ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+            return ccfg;
+        }
+        if (name.equals(CACHE_POSITION)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_POSITION);
+
+            ccfg.setCacheMode(CacheMode.REPLICATED);
+
+            QueryEntity entity = new QueryEntity(Integer.class, Position.class);
+
+            ccfg.setQueryEntities(Collections.singletonList(entity));
+
+            ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+            return ccfg;
+        }
+
+        assert false;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODE_COUNT);
+
+        client = startGrid(NODE_CLIENT);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        checkNoLeaks();
+
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        // Stop additional node that is started in one of the test.
+        stopGrid(NODE_COUNT + 1);
+
+        awaitPartitionMapExchange();
+
+        client.cache(CACHE_PERSON).clear();
+        client.cache(CACHE_ORG).clear();
+        client.cache(CACHE_POSITION).clear();
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testSimpleUpdateDistributedReplicated() throws Exception {
+        fillCaches();
+
+        IgniteCache<Integer, Position> cache = grid(NODE_CLIENT).cache(CACHE_POSITION);
+
+        Position p = cache.get(1);
+
+        List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE Position p SET name = CONCAT('A ', name)", false)
+            .setSkipReducerOnUpdate(true)).getAll();
+
+        assertEquals((long)cache.size(), r.get(0).get(0));
+
+        assertEquals(cache.get(1).name, "A " + p.name);
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testSimpleUpdateDistributedPartitioned() throws Exception {
+        fillCaches();
+
+        IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+        List<List<?>> r = cache.query(new SqlFieldsQueryEx(
+            "UPDATE Person SET position = CASEWHEN(position = 1, 1, position - 1)", false)
+            .setSkipReducerOnUpdate(true)).getAll();
+
+        assertEquals((long)cache.size(), r.get(0).get(0));
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testDistributedUpdateFailedKeys() throws Exception {
+        // UPDATE can produce failed keys due to concurrent modification
+        fillCaches();
+
+        final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() {
+                return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET rate = Modify(_key, rate - 1)", false)
+                    .setSkipReducerOnUpdate(true));
+            }
+        }, CacheException.class, "Failed to update some keys because they had been modified concurrently");
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testDistributedUpdateFail() throws Exception {
+        fillCaches();
+
+        final IgniteCache cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() {
+                return cache.query(new SqlFieldsQueryEx("UPDATE Person SET name = Fail(name)", false)
+                    .setSkipReducerOnUpdate(true));
+            }
+        }, CacheException.class, "Failed to execute SQL query");
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testQueryParallelism() throws Exception {
+        String cacheName = CACHE_ORG + "x4";
+
+        CacheConfiguration cfg = buildCacheConfiguration(CACHE_ORG)
+            .setQueryParallelism(4)
+            .setName(cacheName);
+
+        IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).createCache(cfg);
+
+        for (int i = 0; i < 1024; i++)
+            cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+        List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE \"" + cacheName +
+            "\".Organization o SET name = UPPER(name)", false).setSkipReducerOnUpdate(true)).getAll();
+
+        assertEquals((long)cache.size(), r.get(0).get(0));
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testEvents() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(NODE_COUNT);
+
+        final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt instanceof CacheQueryExecutedEvent;
+
+                CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+                assertNotNull(qe.clause());
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        for (int idx = 0; idx < NODE_COUNT; idx++)
+            grid(idx).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
+
+        IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+        for (int i = 0; i < 1024; i++)
+            cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+        cache.query(new SqlFieldsQueryEx("UPDATE \"org\".Organization o SET name = UPPER(name)", false)
+            .setSkipReducerOnUpdate(true)).getAll();
+
+        assertTrue(latch.await(5000, MILLISECONDS));
+
+        for (int idx = 0; idx < NODE_COUNT; idx++)
+            grid(idx).events().stopLocalListen(pred);
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testSpecificPartitionsUpdate() throws Exception {
+        fillCaches();
+
+        Affinity aff = grid(NODE_CLIENT).affinity(CACHE_PERSON);
+
+        int numParts = aff.partitions();
+        int parts[] = new int[numParts / 2];
+
+        for (int idx = 0; idx < numParts / 2; idx++)
+            parts[idx] = idx * 2;
+
+        IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+        // UPDATE over even partitions
+        cache.query(new SqlFieldsQueryEx("UPDATE Person SET position = 0", false)
+                .setSkipReducerOnUpdate(true)
+                .setPartitions(parts));
+
+        List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _key, position FROM Person")).getAll();
+
+        for (List<?> row : rows) {
+            PersonKey personKey = (PersonKey)row.get(0);
+            int pos = ((Number)row.get(1)).intValue();
+            int part = aff.partition(personKey);
+
+            assertTrue((part % 2 == 0) ^ (pos != 0));
+        }
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testCancel() throws Exception {
+        latch = new CountDownLatch(NODE_COUNT + 1);
+
+        fillCaches();
+
+        final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() {
+                return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+                    .setSkipReducerOnUpdate(true));
+            }
+        });
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Collection<GridRunningQueryInfo> qCol =
+                    grid(NODE_CLIENT).context().query().runningQueries(0);
+
+                if (qCol.isEmpty())
+                    return false;
+
+                for (GridRunningQueryInfo queryInfo : qCol)
+                    queryInfo.cancel();
+
+                return true;
+            }
+        }, 5000);
+
+        latch.await(5000, MILLISECONDS);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws IgniteCheckedException {
+                return fut.get();
+            }
+        }, IgniteCheckedException.class, "Future was cancelled");
+    }
+
+    /**
+     *
+     * @throws Exception if failed.
+     */
+    public void testNodeStopDuringUpdate() throws Exception {
+        startGrid(NODE_COUNT + 1);
+
+        awaitPartitionMapExchange();
+
+        fillCaches();
+
+        latch = new CountDownLatch(NODE_COUNT + 1 + 1);
+
+        final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() {
+                return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+                    .setSkipReducerOnUpdate(true));
+            }
+        });
+
+        final CountDownLatch finalLatch = latch;
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return finalLatch.getCount() == 1;
+            }
+        }, 5000));
+
+        latch.countDown();
+
+        stopGrid(NODE_COUNT + 1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws IgniteCheckedException {
+                return fut.get();
+            }
+        }, IgniteCheckedException.class, "Update failed because map node left topology");
+    }
+
+    /**
+     * Ensure there are no leaks in data structures associated with distributed dml execution.
+     */
+    private void checkNoLeaks() {
+        GridQueryProcessor qryProc = grid(NODE_CLIENT).context().query();
+
+        IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+        GridReduceQueryExecutor rdcQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec");
+
+        Map updRuns = GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "updRuns");
+
+        assertEquals(0, updRuns.size());
+
+        for (int idx = 0; idx < NODE_COUNT; idx++) {
+            qryProc = grid(idx).context().query();
+
+            h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+            GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+            Map qryRess = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "qryRess");
+
+            for (Object obj : qryRess.values()) {
+                Map updCancels = GridTestUtils.getFieldValue(obj, "updCancels");
+
+                assertEquals(0, updCancels.size());
+            }
+        }
+    }
+
+    /**
+     * Fills caches with initial data.
+     */
+    private void fillCaches() {
+        Ignite client = grid(NODE_CLIENT);
+
+        IgniteCache<Integer, Position> posCache = client.cache(CACHE_POSITION);
+
+        // Generate positions
+        Position[] positions = new Position[] {
+            new Position(1, "High Ranking Officer", 1),
+            new Position(2, "Administrative worker", 3),
+            new Position(3, "Worker", 7),
+            new Position(4, "Security", 2),
+            new Position(5, "Cleaner", 1)
+        };
+
+        for (Position pos: positions)
+            posCache.put(pos.id, pos);
+
+        // Generate organizations
+        String[] forms = new String[] {" Inc", " Co", " AG", " Industries"};
+        String[] orgNames = new String[] {"Acme", "Sierra", "Mesa", "Umbrella", "Robotics"};
+        String[] names = new String[] {"Mary", "John", "William", "Tom", "Basil", "Ann", "Peter"};
+
+        IgniteCache<PersonKey, Person> personCache = client.cache(CACHE_PERSON);
+
+        IgniteCache<Integer, Organization> orgCache = client.cache(CACHE_ORG);
+
+        int orgId = 0;
+        int personId = 0;
+
+        for (String orgName : produceCombination(orgNames, orgNames, forms)) {
+            Organization org = new Organization(orgName, 1 + orgId);
+
+            orgCache.put(++orgId, org);
+
+            // Generate persons
+
+            List<String> personNames = produceCombination(names, names, new String[]{"s"});
+
+            int positionId = 0;
+            int posCounter = 0;
+
+            for (String name : personNames) {
+                PersonKey pKey = new PersonKey(orgId, ++personId);
+
+                if (positions[positionId].rate < posCounter++) {
+                    posCounter = 0;
+                    positionId = (positionId + 1) % positions.length;
+                }
+
+                Person person = new Person(name, positions[positionId].id, org.rate * positions[positionId].rate);
+
+                personCache.put(pKey, person);
+            }
+        }
+    }
+
+    /**
+     * Produces all possible combinations.
+     *
+     * @param a First array.
+     * @param b Second array.
+     * @param ends Endings array.
+     * @return Result.
+     */
+    private List<String> produceCombination(String[] a, String[] b, String[] ends) {
+        List<String> res = new ArrayList<>();
+
+        for (String s1 : a) {
+            for (String s2 : b) {
+                if (!s1.equals(s2)) {
+                    String end = ends[ThreadLocalRandom8.current().nextInt(ends.length)];
+
+                    res.add(s1 + " " + s2 + end);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** */
+    private static class Organization {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /** */
+        @QuerySqlField
+        int rate;
+
+        /** */
+        @QuerySqlField
+        Date updated;
+
+        /**
+         * Constructor.
+         *
+         * @param name Organization name.
+         * @param rate Rate.
+         */
+        public Organization(String name, int rate) {
+            this.name = name;
+            this.rate = rate;
+            this.updated = new Date(System.currentTimeMillis());
+        }
+    }
+
+    /** */
+    public static class PersonKey {
+        /** */
+        @AffinityKeyMapped
+        @QuerySqlField
+        private Integer orgId;
+
+        /** */
+        @QuerySqlField
+        private Integer id;
+
+        /**
+         * Constructor.
+         *
+         * @param orgId Organization id.
+         * @param id Person id.
+         */
+        PersonKey(int orgId, int id) {
+            this.orgId = orgId;
+            this.id = id;
+        }
+    }
+
+    /** */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /** */
+        @QuerySqlField
+        int position;
+
+        /** */
+        @QuerySqlField
+        int amount;
+        /** */
+        @QuerySqlField
+        Date updated;
+
+        /**
+         * Constructor.
+         *
+         * @param name Name.
+         * @param position Position.
+         * @param amount Amount.
+         */
+        private Person(String name, int position, int amount) {
+            this.name = name;
+            this.position = position;
+            this.amount = amount;
+
+            this.updated = new Date(System.currentTimeMillis());
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (name==null? 0: name.hashCode()) ^ position ^ amount ^ (updated == null ? 0 : updated.hashCode());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (obj == null)
+                return false;
+
+            if (!obj.getClass().equals(Person.class))
+                return false;
+
+            Person other = (Person)obj;
+
+            return F.eq(name, other.name) && position == other.position &&
+                amount == other.amount && F.eq(updated, other.updated);
+        }
+    }
+
+    /** */
+    private static class Position {
+        /** */
+        @QuerySqlField
+        int id;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /** */
+        @QuerySqlField
+        int rate;
+
+        /**
+         * Constructor.
+         *
+         * @param id Id.
+         * @param name Name.
+         * @param rate Rate.
+         */
+        public Position(int id, String name, int rate) {
+            this.id = id;
+            this.name = name;
+            this.rate = rate;
+        }
+    }
+
+    /**
+     * SQL function that always fails.
+     *
+     * @param param Arbitrary parameter.
+     * @return Result.
+     */
+    @QuerySqlFunction
+    public static String Fail(String param) {
+        throw new IgniteSQLException("Fail() called");
+    }
+
+    /**
+     * SQL function that waits for condition.
+     *
+     * @param param Arbitrary parameter.
+     * @return Result.
+     */
+    @QuerySqlFunction
+    public static String Wait(String param) {
+        try {
+            if (latch.getCount() > 0) {
+                latch.countDown();
+
+                latch.await(5000, MILLISECONDS);
+            }
+            else
+                Thread.sleep(100);
+        }
+        catch (InterruptedException ignore) {
+            // No-op
+        }
+        return param;
+    }
+
+    /**
+     * SQL function that makes a concurrent modification.
+     *
+     * @param id Id.
+     * @param rate Rate.
+     * @return Result.
+     */
+    @QuerySqlFunction
+    public static int Modify(final int id, final int rate) {
+        try {
+            GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() {
+                    IgniteCache cache = client.cache(CACHE_ORG);
+
+                    cache.put(id, new Organization("Acme Inc #" + id, rate + 1));
+
+                    return null;
+                }
+            }).get();
+        }
+        catch (Exception e) {
+            // No-op
+        }
+
+        return rate - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index c49649b..83b4689 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -123,9 +123,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
 import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
 import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest;
@@ -243,6 +245,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheInsertSqlQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheUpdateSqlQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheDeleteSqlQuerySelfTest.class);
+        suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+        suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.class);
 
         suite.addTestSuite(IgniteBinaryObjectQueryArgumentsTest.class);
         suite.addTestSuite(IgniteBinaryObjectLocalQueryArgumentsTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
index 7da6757..3165c4d 100644
--- a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
@@ -43,6 +43,7 @@ namespace
     const bool testReplicatedOnly = true;
     const bool testCollocated = true;
     const bool testLazy = true;
+    const bool testSkipReducerOnUpdate = true;
 
     const std::string testAddress = testServerHost + ':' + ignite::common::LexicalCast<std::string>(testServerPort);
 }
@@ -132,6 +133,7 @@ void CheckConnectionConfig(const Configuration& cfg)
     BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), testReplicatedOnly);
     BOOST_CHECK_EQUAL(cfg.IsCollocated(), testCollocated);
     BOOST_CHECK_EQUAL(cfg.IsLazy(), testLazy);
+    BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), testSkipReducerOnUpdate);
 
     std::stringstream constructor;
 
@@ -143,7 +145,8 @@ void CheckConnectionConfig(const Configuration& cfg)
                 << "lazy=" << BoolToStr(testLazy) << ';'
                 << "page_size=" << testPageSize << ';'
                 << "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
-                << "schema=" << testSchemaName << ';';
+                << "schema=" << testSchemaName << ';'
+                << "skip_reducer_on_update=" << BoolToStr(testReplicatedOnly) << ';';
 
     const std::string& expectedStr = constructor.str();
 
@@ -164,6 +167,7 @@ void CheckDsnConfig(const Configuration& cfg)
     BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), false);
     BOOST_CHECK_EQUAL(cfg.IsCollocated(), false);
     BOOST_CHECK_EQUAL(cfg.IsLazy(), false);
+    BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), false);
 }
 
 BOOST_AUTO_TEST_SUITE(ConfigurationTestSuite)
@@ -180,6 +184,8 @@ BOOST_AUTO_TEST_CASE(CheckTestValuesNotEquealDefault)
     BOOST_CHECK_NE(testEnforceJoinOrder, Configuration::DefaultValue::enforceJoinOrder);
     BOOST_CHECK_NE(testReplicatedOnly, Configuration::DefaultValue::replicatedOnly);
     BOOST_CHECK_NE(testCollocated, Configuration::DefaultValue::collocated);
+    BOOST_CHECK_NE(testLazy, Configuration::DefaultValue::lazy);
+    BOOST_CHECK_NE(testSkipReducerOnUpdate, Configuration::DefaultValue::skipReducerOnUpdate);
 }
 
 BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
@@ -196,7 +202,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
                 << "COLLOCATED=" << BoolToStr(testCollocated, false) << ';'
                 << "REPLICATED_ONLY=" << BoolToStr(testReplicatedOnly, false) << ';'
                 << "PAGE_SIZE=" << testPageSize << ';'
-                << "SCHEMA=" << testSchemaName;
+                << "SCHEMA=" << testSchemaName << ';'
+                << "SKIP_REDUCER_ON_UPDATE=" << BoolToStr(testSkipReducerOnUpdate, false);
 
     const std::string& connectStr = constructor.str();
 
@@ -219,7 +226,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringLowercase)
                 << "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
                 << "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
                 << "collocated=" << BoolToStr(testCollocated) << ';'
-                << "schema=" << testSchemaName;
+                << "schema=" << testSchemaName << ';'
+                << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
 
     const std::string& connectStr = constructor.str();
 
@@ -242,7 +250,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringZeroTerminated)
                 << "collocated=" << BoolToStr(testCollocated) << ';'
                 << "distributed_joins=" << BoolToStr(testDistributedJoins) << ';'
                 << "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
-                << "schema=" << testSchemaName;
+                << "schema=" << testSchemaName << ';'
+                << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
 
     const std::string& connectStr = constructor.str();
 
@@ -265,7 +274,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringMixed)
                 << "Enforce_Join_Order=" << BoolToStr(testEnforceJoinOrder) << ';'
                 << "Replicated_Only=" << BoolToStr(testReplicatedOnly, false) << ';'
                 << "Collocated=" << BoolToStr(testCollocated) << ';'
-                << "Schema=" << testSchemaName;
+                << "Schema=" << testSchemaName << ';'
+                << "Skip_Reducer_On_Update=" << BoolToStr(testSkipReducerOnUpdate);
 
     const std::string& connectStr = constructor.str();
 
@@ -288,7 +298,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringWhitepaces)
                 << "COLLOCATED    =" << BoolToStr(testCollocated, false) << "  ;"
                 << "  REPLICATED_ONLY=   " << BoolToStr(testReplicatedOnly, false) << ';'
                 << "ENFORCE_JOIN_ORDER=   " << BoolToStr(testEnforceJoinOrder, false) << "  ;"
-                << "SCHEMA = \n\r" << testSchemaName;
+                << "SCHEMA = \n\r" << testSchemaName << ';'
+                << " skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate, false);
 
     const std::string& connectStr = constructor.str();
 
@@ -358,6 +369,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringInvalidBoolKeys)
     keys.insert("replicated_only");
     keys.insert("collocated");
     keys.insert("lazy");
+    keys.insert("skip_reducer_on_update");
 
     for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
     {
@@ -385,6 +397,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringValidBoolKeys)
     keys.insert("replicated_only");
     keys.insert("collocated");
     keys.insert("lazy");
+    keys.insert("skip_reducer_on_update");
 
     for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
     {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 4c7e402..707669d 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -755,6 +755,14 @@ BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_1_5)
     InsertTestBatch(11, 20, 9);
 }
 
+BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_3_0)
+{
+    Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache;PROTOCOL_VERSION=2.3.0");
+
+    InsertTestStrings(10, false);
+    InsertTestBatch(11, 20, 9);
+}
+
 BOOST_AUTO_TEST_CASE(TestTwoRowsInt8)
 {
     CheckTwoRowsInt<signed char>(SQL_C_STINYINT);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
index 2b1ec52..419a65e 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
@@ -82,6 +82,9 @@ namespace ignite
 
                     /** Connection attribute keyword for lazy attribute. */
                     static const std::string lazy;
+
+                    /** Connection attribute keyword for skipReducerOnUpdate attribute. */
+                    static const std::string skipReducerOnUpdate;
                 };
 
                 /** Default values for configuration. */
@@ -125,6 +128,9 @@ namespace ignite
 
                     /** Default value for lazy attribute. */
                     static const bool lazy;
+
+                    /** Default value for skipReducerOnUpdate attribute. */
+                    static const bool skipReducerOnUpdate;
                 };
 
                 /**
@@ -384,6 +390,26 @@ namespace ignite
                 }
 
                 /**
+                 * Check update on server flag.
+                 *
+                 * @return True if update on server.
+                 */
+                bool IsSkipReducerOnUpdate() const
+                {
+                    return GetBoolValue(Key::skipReducerOnUpdate, DefaultValue::skipReducerOnUpdate);
+                }
+
+                /**
+                 * Set update on server.
+                 *
+                 * @param val Value to set.
+                 */
+                void SetSkipReducerOnUpdate(bool val)
+                {
+                    SetBoolValue(Key::skipReducerOnUpdate, val);
+                }
+
+                /**
                  * Get protocol version.
                  *
                  * @return Protocol version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
index 91a808c..dda0ba9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
@@ -79,9 +79,10 @@ namespace ignite
              * @param replicatedOnly Replicated only flag.
              * @param collocated Collocated flag.
              * @param lazy Lazy flag.
+             * @param skipReducerOnUpdate Skip reducer on update.
              */
             HandshakeRequest(const ProtocolVersion& version, bool distributedJoins, bool enforceJoinOrder,
-                bool replicatedOnly, bool collocated, bool lazy);
+                bool replicatedOnly, bool collocated, bool lazy, bool skipReducerOnUpdate);
 
             /**
              * Destructor.
@@ -112,6 +113,9 @@ namespace ignite
 
             /** Lazy flag. */
             bool lazy;
+
+            /** Skip reducer on update flag. */
+            bool skipReducerOnUpdate;
         };
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
index c36d5dd..e6088a7 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
@@ -34,6 +34,7 @@ namespace ignite
             /** Current protocol version. */
             static const ProtocolVersion VERSION_2_1_0;
             static const ProtocolVersion VERSION_2_1_5;
+            static const ProtocolVersion VERSION_2_3_0;
 
             typedef std::set<ProtocolVersion> VersionSet;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
index 2974b67..90286b9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
@@ -55,6 +55,7 @@ namespace ignite
                             REPLICATED_ONLY_CHECK_BOX,
                             COLLOCATED_CHECK_BOX,
                             LAZY_CHECK_BOX,
+                            SKIP_REDUCER_ON_UPDATE_CHECK_BOX,
                             PROTOCOL_VERSION_LABEL,
                             PROTOCOL_VERSION_COMBO_BOX,
                             OK_BUTTON,
@@ -149,6 +150,9 @@ namespace ignite
                     /** Lazy CheckBox. */
                     std::auto_ptr<Window> lazyCheckBox;
 
+                    /** Update on server CheckBox. */
+                    std::auto_ptr<Window> skipReducerOnUpdateCheckBox;
+
                     /** Protocol version edit field. */
                     std::auto_ptr<Window> protocolVersionLabel;