You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/13 10:23:12 UTC
[4/7] ignite git commit: IGNITE-6024: SQL: Implemented
"skipReducerOnUpdate" flag. This closes #2488.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
new file mode 100644
index 0000000..e40bc2d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
+
+/**
+ * Request for DML operation on remote node.
+ */
+public class GridH2DmlRequest implements Message, GridCacheQueryMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Request id. */
+ @GridToStringInclude
+ private long reqId;
+
+ /** Cache identifiers. */
+ @GridToStringInclude
+ @GridDirectCollection(Integer.class)
+ private List<Integer> caches;
+
+ /** Topology version. */
+ @GridToStringInclude
+ private AffinityTopologyVersion topVer;
+
+ /** Query partitions. */
+ @GridToStringInclude
+ private int[] qryParts;
+
+ /** Page size. */
+ private int pageSize;
+
+ /** Query. */
+ @GridToStringInclude
+ private String qry;
+
+ /** Flags. */
+ private byte flags;
+
+ /** Timeout. */
+ private int timeout;
+
+ /** Query parameters. */
+ @GridToStringInclude(sensitive = true)
+ @GridDirectTransient
+ private Object[] params;
+
+ /** Query parameters as bytes. */
+ private byte[] paramsBytes;
+
+ /** Schema name. */
+ @GridToStringInclude
+ private String schemaName;
+
+ /**
+ * Required by {@link Externalizable}
+ */
+ public GridH2DmlRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param req Request.
+ */
+ public GridH2DmlRequest(GridH2DmlRequest req) {
+ reqId = req.reqId;
+ caches = req.caches;
+ topVer = req.topVer;
+ qryParts = req.qryParts;
+ pageSize = req.pageSize;
+ qry = req.qry;
+ flags = req.flags;
+ timeout = req.timeout;
+ params = req.params;
+ paramsBytes = req.paramsBytes;
+ schemaName = req.schemaName;
+ }
+
+ /**
+ * @return Parameters.
+ */
+ public Object[] parameters() {
+ return params;
+ }
+
+ /**
+ * @param params Parameters.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest parameters(Object[] params) {
+ if (params == null)
+ params = EMPTY_PARAMS;
+
+ this.params = params;
+
+ return this;
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest requestId(long reqId) {
+ this.reqId = reqId;
+
+ return this;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @param caches Caches.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest caches(List<Integer> caches) {
+ this.caches = caches;
+
+ return this;
+ }
+
+ /**
+ * @return Caches.
+ */
+ public List<Integer> caches() {
+ return caches;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+
+ return this;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Query partitions.
+ */
+ public int[] queryPartitions() {
+ return qryParts;
+ }
+
+ /**
+ * @param qryParts Query partitions.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest queryPartitions(int[] qryParts) {
+ this.qryParts = qryParts;
+
+ return this;
+ }
+
+ /**
+ * @param pageSize Page size.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest pageSize(int pageSize) {
+ this.pageSize = pageSize;
+
+ return this;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @param qry SQL Query.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest query(String qry) {
+ this.qry = qry;
+
+ return this;
+ }
+
+ /**
+ * @return SQL Query.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @param flags Flags.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest flags(int flags) {
+ assert flags >= 0 && flags <= 255: flags;
+
+ this.flags = (byte)flags;
+
+ return this;
+ }
+
+ /**
+ * @param flags Flags to check.
+ * @return {@code true} If all the requested flags are set to {@code true}.
+ */
+ public boolean isFlagSet(int flags) {
+ return (this.flags & flags) == flags;
+ }
+
+ /**
+ * @return Timeout.
+ */
+ public int timeout() {
+ return timeout;
+ }
+
+ /**
+ * @param timeout New timeout.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest timeout(int timeout) {
+ this.timeout = timeout;
+
+ return this;
+ }
+
+ /**
+ * @return Schema name.
+ */
+ public String schemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @param schemaName Schema name.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest schemaName(String schemaName) {
+ this.schemaName = schemaName;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void marshall(Marshaller m) {
+ if (paramsBytes != null)
+ return;
+
+ assert params != null;
+
+ try {
+ paramsBytes = U.marshal(m, params);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IfMayBeConditional")
+ @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+ if (params != null)
+ return;
+
+ assert paramsBytes != null;
+
+ try {
+ final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+ if (m instanceof BinaryMarshaller)
+ // To avoid deserializing of enum types.
+ params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+ else
+ params = U.unmarshal(m, paramsBytes, ldr);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeInt("pageSize", pageSize))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeString("qry", qry))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeIntArray("qryParts", qryParts))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeString("schemaName", schemaName))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("timeout", timeout))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ caches = reader.readCollection("caches", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ pageSize = reader.readInt("pageSize");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ paramsBytes = reader.readByteArray("paramsBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ qry = reader.readString("qry");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ qryParts = reader.readIntArray("qryParts");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ schemaName = reader.readString("schemaName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ timeout = reader.readInt("timeout");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridH2DmlRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -55;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridH2DmlRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
new file mode 100644
index 0000000..808ff9e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Response to remote DML request.
+ */
+public class GridH2DmlResponse implements Message, GridCacheQueryMarshallable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Request id. */
+ @GridToStringInclude
+ private long reqId;
+
+ /** Number of updated rows. */
+ @GridToStringInclude
+ private long updCnt;
+
+ /** Error message. */
+ @GridToStringInclude
+ private String err;
+
+ /** Keys that failed. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Object[] errKeys;
+
+ /** Keys that failed (after marshalling). */
+ private byte[] errKeysBytes;
+
+ /**
+ * Default constructor.
+ */
+ public GridH2DmlResponse() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param updCnt Updated row number.
+ * @param errKeys Erroneous keys.
+ * @param error Error message.
+ */
+ public GridH2DmlResponse(long reqId, long updCnt, Object[] errKeys, String error) {
+ this.reqId = reqId;
+ this.updCnt = updCnt;
+ this.errKeys = errKeys;
+ this.err = error;
+ }
+
+ /**
+ * @return Request id.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Update counter.
+ */
+ public long updateCounter() {
+ return updCnt;
+ }
+
+ /**
+ * @return Error keys.
+ */
+ public Object[] errorKeys() {
+ return errKeys;
+ }
+
+ /**
+ * @return Error message.
+ */
+ public String error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void marshall(Marshaller m) {
+ if (errKeysBytes != null || errKeys == null)
+ return;
+
+ try {
+ errKeysBytes = U.marshal(m, errKeys);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IfMayBeConditional")
+ @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+ if (errKeys != null || errKeysBytes == null)
+ return;
+
+ try {
+ final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+ if (m instanceof BinaryMarshaller)
+ // To avoid deserializing of enum types.
+ errKeys = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(errKeysBytes, ldr);
+ else
+ errKeys = U.unmarshal(m, errKeysBytes, ldr);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridH2DmlResponse.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeString("err", err))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByteArray("errKeysBytes", errKeysBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeLong("updCnt", updCnt))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ err = reader.readString("err");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ errKeysBytes = reader.readByteArray("errKeysBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ updCnt = reader.readLong("updCnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridH2DmlResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -56;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ @Override public void onAckReceived() {
+ // No-op
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 18b1afb..3c13392 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -112,6 +112,12 @@ public class GridH2ValueMessageFactory implements MessageFactory {
case -54:
return new QueryTable();
+
+ case -55:
+ return new GridH2DmlRequest();
+
+ case -56:
+ return new GridH2DmlResponse();
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
new file mode 100644
index 0000000..e5efc06
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java
@@ -0,0 +1,783 @@
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for {@link SqlFieldsQueryEx#skipReducerOnUpdate} flag.
+ */
+public class IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int NODE_COUNT = 4;
+
+ /** */
+ private static String NODE_CLIENT = "client";
+
+ /** */
+ private static String CACHE_ACCOUNT = "acc";
+
+ /** */
+ private static String CACHE_REPORT = "rep";
+
+ /** */
+ private static String CACHE_STOCK = "stock";
+
+ /** */
+ private static String CACHE_TRADE = "trade";
+
+ /** */
+ private static String CACHE_LIST = "list";
+
+ /** */
+ private static IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ c.setDiscoverySpi(disco);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ ccfgs.add(buildCacheConfiguration(CACHE_ACCOUNT));
+ ccfgs.add(buildCacheConfiguration(CACHE_STOCK));
+ ccfgs.add(buildCacheConfiguration(CACHE_TRADE));
+ ccfgs.add(buildCacheConfiguration(CACHE_REPORT));
+ ccfgs.add(buildCacheConfiguration(CACHE_LIST));
+
+ c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ if (gridName.equals(NODE_CLIENT))
+ c.setClientMode(true);
+
+ return c;
+ }
+
+ /**
+ * Creates a cache configuration.
+ *
+ * @param name Name of the cache.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration buildCacheConfiguration(String name) {
+ if (name.equals(CACHE_ACCOUNT)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_ACCOUNT);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Account.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_STOCK)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_STOCK);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Stock.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_TRADE)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_TRADE);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Trade.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_REPORT)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_REPORT);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Report.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_LIST)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_LIST);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, String.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ return ccfg;
+ }
+
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_COUNT);
+
+ client = (IgniteEx)startGrid(NODE_CLIENT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ awaitPartitionMapExchange();
+
+ client.cache(CACHE_ACCOUNT).clear();
+ client.cache(CACHE_STOCK).clear();
+ client.cache(CACHE_TRADE).clear();
+ client.cache(CACHE_REPORT).clear();
+ client.cache(CACHE_LIST).clear();
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdate() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE depo > 0";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, new SqlFieldsQueryEx(text, false).setArgs(10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateFastKey() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE _key = ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 1));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateLimit() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE sn >= ? AND sn < ? LIMIT ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 0, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateWhereSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, -100);
+
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET qty = ? " +
+ "WHERE accountId IN (SELECT p._key FROM \"acc\".Account p WHERE depo < ?)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(0, 0));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateSetSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET qty = " +
+ "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdateSetTableSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "UPDATE \"trade\".Trade t SET (qty) = " +
+ "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertValues() throws Exception {
+ String text = "INSERT INTO \"acc\".Account (_key, name, sn, depo)" +
+ " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, "John Marry", 11111, 100, 2, "Marry John", 11112, 200));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelect() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectOrderBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+ "ORDER BY a.sn DESC";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectUnion() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(20, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, 0, a.depo, 1 FROM \"acc\".Account a " +
+ "UNION " +
+ "SELECT 101 + a2._key, a2._key, 1, a2.depo, 1 FROM \"acc\".Account a2";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectGroupBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ String text = "INSERT INTO \"rep\".Report (_key, accountId, spends, count) " +
+ "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+ "FROM \"trade\".Trade " +
+ "GROUP BY accountId";
+
+ checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectDistinct() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 2, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "INSERT INTO \"list\".String (_key, _val) " +
+ "SELECT DISTINCT sn, name FROM \"acc\".Account ";
+
+ checkUpdate(client.<Integer, String>cache(CACHE_LIST), null,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testInsertFromSelectJoin() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+ Map<Integer, Stock> stocks = getStocks(5);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_STOCK).putAll(stocks);
+
+ String text = "INSERT INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+ "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+ "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDelete() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "DELETE FROM \"acc\".Account WHERE sn > ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDeleteTop() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ String text = "DELETE TOP ? FROM \"acc\".Account WHERE sn < ?";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testDeleteWhereSubquery() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(20, 1, 100);
+ Map<Integer, Trade> trades = getTrades(10, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ String text = "DELETE FROM \"acc\".Account " +
+ "WHERE _key IN (SELECT t.accountId FROM \"trade\".Trade t)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeValues() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(1, 1, 100);
+
+ String text = "MERGE INTO \"acc\".Account (_key, name, sn, depo)" +
+ " VALUES (?, ?, ?, ?), (?, ?, ?, ?)";
+
+ checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts,
+ new SqlFieldsQueryEx(text, false).setArgs(0, "John Marry", 11111, 100, 1, "Marry John", 11112, 200));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectJoin() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 100);
+ Map<Integer, Stock> stocks = getStocks(5);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_STOCK).putAll(stocks);
+
+ Map<Integer, Trade> trades = new HashMap<>();
+
+ trades.put(5, new Trade(1, 1, 1, 1));
+
+ String text = "MERGE INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " +
+ "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " +
+ "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectOrderBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+
+ Map<Integer, Trade> trades = new HashMap<>();
+
+ trades.put(5, new Trade(1, 1, 1, 1));
+
+ String text = "MERGE INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " +
+ "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " +
+ "ORDER BY a.sn DESC";
+
+ checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades,
+ new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10));
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ public void testMergeFromSelectGroupBy() throws Exception {
+ Map<Integer, Account> accounts = getAccounts(100, 1, 1000);
+ Map<Integer, Trade> trades = getTrades(100, 2);
+
+ client.cache(CACHE_ACCOUNT).putAll(accounts);
+ client.cache(CACHE_TRADE).putAll(trades);
+
+ Map<Integer, Report> reports = new HashMap<>();
+
+ reports.put(5, new Report(5, 1, 1));
+
+ String text = "MERGE INTO \"rep\".Report (_key, accountId, spends, count) " +
+ "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " +
+ "FROM \"trade\".Trade " +
+ "GROUP BY accountId";
+
+ checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), reports,
+ new SqlFieldsQueryEx(text, false));
+ }
+
+ /**
+ * Constructs multiple Account objects.
+ *
+ * @param num Number of accounts.
+ * @param numCopy Number of copies.
+ * @param depo Deposit amount.
+ * @return Map of accounts.
+ */
+ private Map<Integer, Account> getAccounts(int num, int numCopy, int depo) {
+ Map<Integer, Account> res = new HashMap<>();
+
+ int count = 0;
+
+ for (int i = 0; i < num; ++i) {
+ String name = "John doe #" + i;
+
+ for (int j = 0; j < numCopy; ++j)
+ res.put(count++, new Account(name, i, depo));
+ }
+
+ return res;
+ }
+
+ /**
+ * Constructs multiple Stock objects.
+ *
+ * @param num Number of stocks.
+ * @return Map of Stock objects.
+ */
+ private Map<Integer, Stock> getStocks(int num) {
+ Map<Integer, Stock> res = new HashMap<>();
+
+ for (int i = 0; i < num; ++i)
+ res.put(i, new Stock("T" + i, "Stock #" + i));
+
+ return res;
+ }
+
+ /**
+ * Constructs multiple Trade objects.
+ *
+ * @param numAccounts Number of accounts.
+ * @param numStocks Number of stocks.
+ * @return Map of Trade objects.
+ */
+ private Map<Integer, Trade> getTrades(int numAccounts, int numStocks) {
+ Map<Integer, Trade> res = new HashMap<>();
+
+ int count = 0;
+
+ for (int i = 0; i < numAccounts; ++i) {
+ for (int j = 0; j < numStocks; ++j) {
+ res.put(count++, new Trade(i, j, 100, 100));
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Executes provided sql update with skipReducerOnUpdate flag on and off and checks results are the same.
+ *
+ * @param cache Cache.
+ * @param initial Initial content of the cache.
+ * @param qry Query to execute.
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+ private <K, V> void checkUpdate(IgniteCache<K, V> cache, Map<K, V> initial, SqlFieldsQueryEx qry) {
+ cache.clear();
+
+ if (!F.isEmpty(initial))
+ cache.putAll(initial);
+
+ List<List<?>> updRes = cache.query(qry.setSkipReducerOnUpdate(true)).getAll();
+
+ Map<K, V> result = new HashMap<>(cache.size());
+
+ for (Cache.Entry<K, V> e : cache)
+ result.put(e.getKey(), e.getValue());
+
+ cache.clear();
+
+ if (!F.isEmpty(initial))
+ cache.putAll(initial);
+
+ List<List<?>> updRes2 = cache.query(qry.setSkipReducerOnUpdate(false)).getAll();
+
+ assertTrue(((Number)updRes.get(0).get(0)).intValue() > 0);
+
+ assertEquals(((Number)updRes.get(0).get(0)).intValue(), ((Number)updRes2.get(0).get(0)).intValue());
+
+ assertEquals(result.size(), cache.size());
+
+ for (Cache.Entry<K, V> e : cache)
+ assertEquals(e.getValue(), result.get(e.getKey()));
+ }
+
+ /** */
+ public class Account {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int sn;
+
+ /** */
+ @QuerySqlField
+ int depo;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param sn ID.
+ * @param depo Deposit amount.
+ */
+ Account(String name, int sn, int depo) {
+ this.name = name;
+ this.sn = sn;
+ this.depo = depo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (name == null ? 0 : name.hashCode()) ^ sn ^ depo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Account.class))
+ return false;
+
+ Account other = (Account)obj;
+
+ return F.eq(name, other.name) && sn == other.sn && depo == other.depo;
+ }
+ }
+
+ /** */
+ public class Stock {
+ /** */
+ @QuerySqlField
+ String ticker;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /**
+ * Constructor.
+ *
+ * @param ticker Short name.
+ * @param name Name.
+ */
+ Stock(String ticker, String name) {
+ this.ticker = ticker;
+ this.name = name;
+ }
+ }
+
+ /** */
+ public class Trade {
+ /** */
+ @QuerySqlField
+ int accountId;
+
+ /** */
+ @QuerySqlField
+ int stockId;
+
+ /** */
+ @QuerySqlField
+ int qty;
+
+ /** */
+ @QuerySqlField
+ int price;
+
+ /**
+ * Constructor.
+ *
+ * @param accountId Account id.
+ * @param stockId Stock id.
+ * @param qty Quantity.
+ * @param price Price.
+ */
+ Trade(int accountId, int stockId, int qty, int price) {
+ this.accountId = accountId;
+ this.stockId = stockId;
+ this.qty = qty;
+ this.price = price;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return accountId ^ stockId ^ qty ^ price;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Trade.class))
+ return false;
+
+ Trade other = (Trade)obj;
+
+ return accountId == other.accountId && stockId == other.stockId &&
+ qty == other.qty && price == other.price;
+ }
+
+ }
+
+ /** */
+ public class Report {
+ /** */
+ @QuerySqlField
+ int accountId;
+
+ /** */
+ @QuerySqlField
+ int spends;
+
+ /** */
+ @QuerySqlField
+ int count;
+
+ /**
+ * Constructor.
+ *
+ * @param accountId Account id.
+ * @param spends Spends.
+ * @param count Count.
+ */
+ Report(int accountId, int spends, int count) {
+ this.accountId = accountId;
+ this.spends = spends;
+ this.count = count;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return accountId ^ spends ^ count;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Report.class))
+ return false;
+
+ Report other = (Report)obj;
+
+ return accountId == other.accountId && spends == other.spends &&
+ count == other.count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
new file mode 100644
index 0000000..a2a6bf8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+
+/**
+ * Tests for distributed DML.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+public class IgniteSqlSkipReducerOnUpdateDmlSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static int NODE_COUNT = 4;
+
+ /** */
+ private static String NODE_CLIENT = "client";
+
+ /** */
+ private static String CACHE_ORG = "org";
+
+ /** */
+ private static String CACHE_PERSON = "person";
+
+ /** */
+ private static String CACHE_POSITION = "pos";
+
+ /** */
+ private static Ignite client;
+
+ /** */
+ private static CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ c.setDiscoverySpi(disco);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ ccfgs.add(buildCacheConfiguration(CACHE_ORG));
+ ccfgs.add(buildCacheConfiguration(CACHE_PERSON));
+ ccfgs.add(buildCacheConfiguration(CACHE_POSITION));
+
+ c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ c.setLongQueryWarningTimeout(10000);
+
+ if (gridName.equals(NODE_CLIENT))
+ c.setClientMode(true);
+
+ return c;
+ }
+
+ /**
+ * Creates cache configuration.
+ *
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration buildCacheConfiguration(String name) {
+ if (name.equals(CACHE_ORG)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_ORG);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Organization.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_PERSON)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON);
+
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+ QueryEntity entity = new QueryEntity(PersonKey.class, Person.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setKeyConfiguration(new CacheKeyConfiguration(PersonKey.class));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+ if (name.equals(CACHE_POSITION)) {
+ CacheConfiguration ccfg = new CacheConfiguration(CACHE_POSITION);
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ QueryEntity entity = new QueryEntity(Integer.class, Position.class);
+
+ ccfg.setQueryEntities(Collections.singletonList(entity));
+
+ ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+
+ return ccfg;
+ }
+
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_COUNT);
+
+ client = startGrid(NODE_CLIENT);
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ checkNoLeaks();
+
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ // Stop additional node that is started in one of the test.
+ stopGrid(NODE_COUNT + 1);
+
+ awaitPartitionMapExchange();
+
+ client.cache(CACHE_PERSON).clear();
+ client.cache(CACHE_ORG).clear();
+ client.cache(CACHE_POSITION).clear();
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSimpleUpdateDistributedReplicated() throws Exception {
+ fillCaches();
+
+ IgniteCache<Integer, Position> cache = grid(NODE_CLIENT).cache(CACHE_POSITION);
+
+ Position p = cache.get(1);
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE Position p SET name = CONCAT('A ', name)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+
+ assertEquals(cache.get(1).name, "A " + p.name);
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSimpleUpdateDistributedPartitioned() throws Exception {
+ fillCaches();
+
+ IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx(
+ "UPDATE Person SET position = CASEWHEN(position = 1, 1, position - 1)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testDistributedUpdateFailedKeys() throws Exception {
+ // UPDATE can produce failed keys due to concurrent modification
+ fillCaches();
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET rate = Modify(_key, rate - 1)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ }, CacheException.class, "Failed to update some keys because they had been modified concurrently");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testDistributedUpdateFail() throws Exception {
+ fillCaches();
+
+ final IgniteCache cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Person SET name = Fail(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ }, CacheException.class, "Failed to execute SQL query");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void testQueryParallelism() throws Exception {
+ String cacheName = CACHE_ORG + "x4";
+
+ CacheConfiguration cfg = buildCacheConfiguration(CACHE_ORG)
+ .setQueryParallelism(4)
+ .setName(cacheName);
+
+ IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).createCache(cfg);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+ List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE \"" + cacheName +
+ "\".Organization o SET name = UPPER(name)", false).setSkipReducerOnUpdate(true)).getAll();
+
+ assertEquals((long)cache.size(), r.get(0).get(0));
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testEvents() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(NODE_COUNT);
+
+ final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt instanceof CacheQueryExecutedEvent;
+
+ CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+ assertNotNull(qe.clause());
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ for (int idx = 0; idx < NODE_COUNT; idx++)
+ grid(idx).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
+
+ IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ for (int i = 0; i < 1024; i++)
+ cache.put(i, new Organization("Acme Inc #" + i, 0));
+
+ cache.query(new SqlFieldsQueryEx("UPDATE \"org\".Organization o SET name = UPPER(name)", false)
+ .setSkipReducerOnUpdate(true)).getAll();
+
+ assertTrue(latch.await(5000, MILLISECONDS));
+
+ for (int idx = 0; idx < NODE_COUNT; idx++)
+ grid(idx).events().stopLocalListen(pred);
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testSpecificPartitionsUpdate() throws Exception {
+ fillCaches();
+
+ Affinity aff = grid(NODE_CLIENT).affinity(CACHE_PERSON);
+
+ int numParts = aff.partitions();
+ int parts[] = new int[numParts / 2];
+
+ for (int idx = 0; idx < numParts / 2; idx++)
+ parts[idx] = idx * 2;
+
+ IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+ // UPDATE over even partitions
+ cache.query(new SqlFieldsQueryEx("UPDATE Person SET position = 0", false)
+ .setSkipReducerOnUpdate(true)
+ .setPartitions(parts));
+
+ List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _key, position FROM Person")).getAll();
+
+ for (List<?> row : rows) {
+ PersonKey personKey = (PersonKey)row.get(0);
+ int pos = ((Number)row.get(1)).intValue();
+ int part = aff.partition(personKey);
+
+ assertTrue((part % 2 == 0) ^ (pos != 0));
+ }
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testCancel() throws Exception {
+ latch = new CountDownLatch(NODE_COUNT + 1);
+
+ fillCaches();
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ });
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ Collection<GridRunningQueryInfo> qCol =
+ grid(NODE_CLIENT).context().query().runningQueries(0);
+
+ if (qCol.isEmpty())
+ return false;
+
+ for (GridRunningQueryInfo queryInfo : qCol)
+ queryInfo.cancel();
+
+ return true;
+ }
+ }, 5000);
+
+ latch.await(5000, MILLISECONDS);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }, IgniteCheckedException.class, "Future was cancelled");
+ }
+
+ /**
+ *
+ * @throws Exception if failed.
+ */
+ public void testNodeStopDuringUpdate() throws Exception {
+ startGrid(NODE_COUNT + 1);
+
+ awaitPartitionMapExchange();
+
+ fillCaches();
+
+ latch = new CountDownLatch(NODE_COUNT + 1 + 1);
+
+ final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false)
+ .setSkipReducerOnUpdate(true));
+ }
+ });
+
+ final CountDownLatch finalLatch = latch;
+
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return finalLatch.getCount() == 1;
+ }
+ }, 5000));
+
+ latch.countDown();
+
+ stopGrid(NODE_COUNT + 1);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }, IgniteCheckedException.class, "Update failed because map node left topology");
+ }
+
+ /**
+ * Ensure there are no leaks in data structures associated with distributed dml execution.
+ */
+ private void checkNoLeaks() {
+ GridQueryProcessor qryProc = grid(NODE_CLIENT).context().query();
+
+ IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+ GridReduceQueryExecutor rdcQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec");
+
+ Map updRuns = GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "updRuns");
+
+ assertEquals(0, updRuns.size());
+
+ for (int idx = 0; idx < NODE_COUNT; idx++) {
+ qryProc = grid(idx).context().query();
+
+ h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+ GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+ Map qryRess = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "qryRess");
+
+ for (Object obj : qryRess.values()) {
+ Map updCancels = GridTestUtils.getFieldValue(obj, "updCancels");
+
+ assertEquals(0, updCancels.size());
+ }
+ }
+ }
+
+ /**
+ * Fills caches with initial data.
+ */
+ private void fillCaches() {
+ Ignite client = grid(NODE_CLIENT);
+
+ IgniteCache<Integer, Position> posCache = client.cache(CACHE_POSITION);
+
+ // Generate positions
+ Position[] positions = new Position[] {
+ new Position(1, "High Ranking Officer", 1),
+ new Position(2, "Administrative worker", 3),
+ new Position(3, "Worker", 7),
+ new Position(4, "Security", 2),
+ new Position(5, "Cleaner", 1)
+ };
+
+ for (Position pos: positions)
+ posCache.put(pos.id, pos);
+
+ // Generate organizations
+ String[] forms = new String[] {" Inc", " Co", " AG", " Industries"};
+ String[] orgNames = new String[] {"Acme", "Sierra", "Mesa", "Umbrella", "Robotics"};
+ String[] names = new String[] {"Mary", "John", "William", "Tom", "Basil", "Ann", "Peter"};
+
+ IgniteCache<PersonKey, Person> personCache = client.cache(CACHE_PERSON);
+
+ IgniteCache<Integer, Organization> orgCache = client.cache(CACHE_ORG);
+
+ int orgId = 0;
+ int personId = 0;
+
+ for (String orgName : produceCombination(orgNames, orgNames, forms)) {
+ Organization org = new Organization(orgName, 1 + orgId);
+
+ orgCache.put(++orgId, org);
+
+ // Generate persons
+
+ List<String> personNames = produceCombination(names, names, new String[]{"s"});
+
+ int positionId = 0;
+ int posCounter = 0;
+
+ for (String name : personNames) {
+ PersonKey pKey = new PersonKey(orgId, ++personId);
+
+ if (positions[positionId].rate < posCounter++) {
+ posCounter = 0;
+ positionId = (positionId + 1) % positions.length;
+ }
+
+ Person person = new Person(name, positions[positionId].id, org.rate * positions[positionId].rate);
+
+ personCache.put(pKey, person);
+ }
+ }
+ }
+
+ /**
+ * Produces all possible combinations.
+ *
+ * @param a First array.
+ * @param b Second array.
+ * @param ends Endings array.
+ * @return Result.
+ */
+ private List<String> produceCombination(String[] a, String[] b, String[] ends) {
+ List<String> res = new ArrayList<>();
+
+ for (String s1 : a) {
+ for (String s2 : b) {
+ if (!s1.equals(s2)) {
+ String end = ends[ThreadLocalRandom8.current().nextInt(ends.length)];
+
+ res.add(s1 + " " + s2 + end);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /** */
+ private static class Organization {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int rate;
+
+ /** */
+ @QuerySqlField
+ Date updated;
+
+ /**
+ * Constructor.
+ *
+ * @param name Organization name.
+ * @param rate Rate.
+ */
+ public Organization(String name, int rate) {
+ this.name = name;
+ this.rate = rate;
+ this.updated = new Date(System.currentTimeMillis());
+ }
+ }
+
+ /** */
+ public static class PersonKey {
+ /** */
+ @AffinityKeyMapped
+ @QuerySqlField
+ private Integer orgId;
+
+ /** */
+ @QuerySqlField
+ private Integer id;
+
+ /**
+ * Constructor.
+ *
+ * @param orgId Organization id.
+ * @param id Person id.
+ */
+ PersonKey(int orgId, int id) {
+ this.orgId = orgId;
+ this.id = id;
+ }
+ }
+
+ /** */
+ public static class Person {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int position;
+
+ /** */
+ @QuerySqlField
+ int amount;
+ /** */
+ @QuerySqlField
+ Date updated;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param position Position.
+ * @param amount Amount.
+ */
+ private Person(String name, int position, int amount) {
+ this.name = name;
+ this.position = position;
+ this.amount = amount;
+
+ this.updated = new Date(System.currentTimeMillis());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (name==null? 0: name.hashCode()) ^ position ^ amount ^ (updated == null ? 0 : updated.hashCode());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (!obj.getClass().equals(Person.class))
+ return false;
+
+ Person other = (Person)obj;
+
+ return F.eq(name, other.name) && position == other.position &&
+ amount == other.amount && F.eq(updated, other.updated);
+ }
+ }
+
+ /** */
+ private static class Position {
+ /** */
+ @QuerySqlField
+ int id;
+
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField
+ int rate;
+
+ /**
+ * Constructor.
+ *
+ * @param id Id.
+ * @param name Name.
+ * @param rate Rate.
+ */
+ public Position(int id, String name, int rate) {
+ this.id = id;
+ this.name = name;
+ this.rate = rate;
+ }
+ }
+
+ /**
+ * SQL function that always fails.
+ *
+ * @param param Arbitrary parameter.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static String Fail(String param) {
+ throw new IgniteSQLException("Fail() called");
+ }
+
+ /**
+ * SQL function that waits for condition.
+ *
+ * @param param Arbitrary parameter.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static String Wait(String param) {
+ try {
+ if (latch.getCount() > 0) {
+ latch.countDown();
+
+ latch.await(5000, MILLISECONDS);
+ }
+ else
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore) {
+ // No-op
+ }
+ return param;
+ }
+
+ /**
+ * SQL function that makes a concurrent modification.
+ *
+ * @param id Id.
+ * @param rate Rate.
+ * @return Result.
+ */
+ @QuerySqlFunction
+ public static int Modify(final int id, final int rate) {
+ try {
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() {
+ IgniteCache cache = client.cache(CACHE_ORG);
+
+ cache.put(id, new Organization("Acme Inc #" + id, rate + 1));
+
+ return null;
+ }
+ }).get();
+ }
+ catch (Exception e) {
+ // No-op
+ }
+
+ return rate - 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index c49649b..83b4689 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -123,9 +123,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest;
import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest;
@@ -243,6 +245,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheInsertSqlQuerySelfTest.class);
suite.addTestSuite(IgniteCacheUpdateSqlQuerySelfTest.class);
suite.addTestSuite(IgniteCacheDeleteSqlQuerySelfTest.class);
+ suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class);
+ suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.class);
suite.addTestSuite(IgniteBinaryObjectQueryArgumentsTest.class);
suite.addTestSuite(IgniteBinaryObjectLocalQueryArgumentsTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
index 7da6757..3165c4d 100644
--- a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp
@@ -43,6 +43,7 @@ namespace
const bool testReplicatedOnly = true;
const bool testCollocated = true;
const bool testLazy = true;
+ const bool testSkipReducerOnUpdate = true;
const std::string testAddress = testServerHost + ':' + ignite::common::LexicalCast<std::string>(testServerPort);
}
@@ -132,6 +133,7 @@ void CheckConnectionConfig(const Configuration& cfg)
BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), testReplicatedOnly);
BOOST_CHECK_EQUAL(cfg.IsCollocated(), testCollocated);
BOOST_CHECK_EQUAL(cfg.IsLazy(), testLazy);
+ BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), testSkipReducerOnUpdate);
std::stringstream constructor;
@@ -143,7 +145,8 @@ void CheckConnectionConfig(const Configuration& cfg)
<< "lazy=" << BoolToStr(testLazy) << ';'
<< "page_size=" << testPageSize << ';'
<< "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
- << "schema=" << testSchemaName << ';';
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testReplicatedOnly) << ';';
const std::string& expectedStr = constructor.str();
@@ -164,6 +167,7 @@ void CheckDsnConfig(const Configuration& cfg)
BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), false);
BOOST_CHECK_EQUAL(cfg.IsCollocated(), false);
BOOST_CHECK_EQUAL(cfg.IsLazy(), false);
+ BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), false);
}
BOOST_AUTO_TEST_SUITE(ConfigurationTestSuite)
@@ -180,6 +184,8 @@ BOOST_AUTO_TEST_CASE(CheckTestValuesNotEquealDefault)
BOOST_CHECK_NE(testEnforceJoinOrder, Configuration::DefaultValue::enforceJoinOrder);
BOOST_CHECK_NE(testReplicatedOnly, Configuration::DefaultValue::replicatedOnly);
BOOST_CHECK_NE(testCollocated, Configuration::DefaultValue::collocated);
+ BOOST_CHECK_NE(testLazy, Configuration::DefaultValue::lazy);
+ BOOST_CHECK_NE(testSkipReducerOnUpdate, Configuration::DefaultValue::skipReducerOnUpdate);
}
BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
@@ -196,7 +202,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringUppercase)
<< "COLLOCATED=" << BoolToStr(testCollocated, false) << ';'
<< "REPLICATED_ONLY=" << BoolToStr(testReplicatedOnly, false) << ';'
<< "PAGE_SIZE=" << testPageSize << ';'
- << "SCHEMA=" << testSchemaName;
+ << "SCHEMA=" << testSchemaName << ';'
+ << "SKIP_REDUCER_ON_UPDATE=" << BoolToStr(testSkipReducerOnUpdate, false);
const std::string& connectStr = constructor.str();
@@ -219,7 +226,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringLowercase)
<< "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
<< "replicated_only=" << BoolToStr(testReplicatedOnly) << ';'
<< "collocated=" << BoolToStr(testCollocated) << ';'
- << "schema=" << testSchemaName;
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -242,7 +250,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringZeroTerminated)
<< "collocated=" << BoolToStr(testCollocated) << ';'
<< "distributed_joins=" << BoolToStr(testDistributedJoins) << ';'
<< "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';'
- << "schema=" << testSchemaName;
+ << "schema=" << testSchemaName << ';'
+ << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -265,7 +274,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringMixed)
<< "Enforce_Join_Order=" << BoolToStr(testEnforceJoinOrder) << ';'
<< "Replicated_Only=" << BoolToStr(testReplicatedOnly, false) << ';'
<< "Collocated=" << BoolToStr(testCollocated) << ';'
- << "Schema=" << testSchemaName;
+ << "Schema=" << testSchemaName << ';'
+ << "Skip_Reducer_On_Update=" << BoolToStr(testSkipReducerOnUpdate);
const std::string& connectStr = constructor.str();
@@ -288,7 +298,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringWhitepaces)
<< "COLLOCATED =" << BoolToStr(testCollocated, false) << " ;"
<< " REPLICATED_ONLY= " << BoolToStr(testReplicatedOnly, false) << ';'
<< "ENFORCE_JOIN_ORDER= " << BoolToStr(testEnforceJoinOrder, false) << " ;"
- << "SCHEMA = \n\r" << testSchemaName;
+ << "SCHEMA = \n\r" << testSchemaName << ';'
+ << " skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate, false);
const std::string& connectStr = constructor.str();
@@ -358,6 +369,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringInvalidBoolKeys)
keys.insert("replicated_only");
keys.insert("collocated");
keys.insert("lazy");
+ keys.insert("skip_reducer_on_update");
for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
{
@@ -385,6 +397,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringValidBoolKeys)
keys.insert("replicated_only");
keys.insert("collocated");
keys.insert("lazy");
+ keys.insert("skip_reducer_on_update");
for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it)
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 4c7e402..707669d 100644
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@ -755,6 +755,14 @@ BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_1_5)
InsertTestBatch(11, 20, 9);
}
+BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_3_0)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache;PROTOCOL_VERSION=2.3.0");
+
+ InsertTestStrings(10, false);
+ InsertTestBatch(11, 20, 9);
+}
+
BOOST_AUTO_TEST_CASE(TestTwoRowsInt8)
{
CheckTwoRowsInt<signed char>(SQL_C_STINYINT);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
index 2b1ec52..419a65e 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h
@@ -82,6 +82,9 @@ namespace ignite
/** Connection attribute keyword for lazy attribute. */
static const std::string lazy;
+
+ /** Connection attribute keyword for skipReducerOnUpdate attribute. */
+ static const std::string skipReducerOnUpdate;
};
/** Default values for configuration. */
@@ -125,6 +128,9 @@ namespace ignite
/** Default value for lazy attribute. */
static const bool lazy;
+
+ /** Default value for skipReducerOnUpdate attribute. */
+ static const bool skipReducerOnUpdate;
};
/**
@@ -384,6 +390,26 @@ namespace ignite
}
/**
+ * Check update on server flag.
+ *
+ * @return True if update on server.
+ */
+ bool IsSkipReducerOnUpdate() const
+ {
+ return GetBoolValue(Key::skipReducerOnUpdate, DefaultValue::skipReducerOnUpdate);
+ }
+
+ /**
+ * Set update on server.
+ *
+ * @param val Value to set.
+ */
+ void SetSkipReducerOnUpdate(bool val)
+ {
+ SetBoolValue(Key::skipReducerOnUpdate, val);
+ }
+
+ /**
* Get protocol version.
*
* @return Protocol version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
index 91a808c..dda0ba9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h
@@ -79,9 +79,10 @@ namespace ignite
* @param replicatedOnly Replicated only flag.
* @param collocated Collocated flag.
* @param lazy Lazy flag.
+ * @param skipReducerOnUpdate Skip reducer on update.
*/
HandshakeRequest(const ProtocolVersion& version, bool distributedJoins, bool enforceJoinOrder,
- bool replicatedOnly, bool collocated, bool lazy);
+ bool replicatedOnly, bool collocated, bool lazy, bool skipReducerOnUpdate);
/**
* Destructor.
@@ -112,6 +113,9 @@ namespace ignite
/** Lazy flag. */
bool lazy;
+
+ /** Skip reducer on update flag. */
+ bool skipReducerOnUpdate;
};
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
index c36d5dd..e6088a7 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h
@@ -34,6 +34,7 @@ namespace ignite
/** Current protocol version. */
static const ProtocolVersion VERSION_2_1_0;
static const ProtocolVersion VERSION_2_1_5;
+ static const ProtocolVersion VERSION_2_3_0;
typedef std::set<ProtocolVersion> VersionSet;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
index 2974b67..90286b9 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h
@@ -55,6 +55,7 @@ namespace ignite
REPLICATED_ONLY_CHECK_BOX,
COLLOCATED_CHECK_BOX,
LAZY_CHECK_BOX,
+ SKIP_REDUCER_ON_UPDATE_CHECK_BOX,
PROTOCOL_VERSION_LABEL,
PROTOCOL_VERSION_COMBO_BOX,
OK_BUTTON,
@@ -149,6 +150,9 @@ namespace ignite
/** Lazy CheckBox. */
std::auto_ptr<Window> lazyCheckBox;
+ /** Update on server CheckBox. */
+ std::auto_ptr<Window> skipReducerOnUpdateCheckBox;
+
/** Protocol version edit field. */
std::auto_ptr<Window> protocolVersionLabel;