You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/22 10:49:04 UTC
[01/10] ignite git commit: IGNITE-3075 Implement single key-value
pair DHT request/response for ATOMIC cache.
Repository: ignite
Updated Branches:
refs/heads/ignite-4259 [created] f789e2dd5
IGNITE-3075 Implement single key-value pair DHT request/response for ATOMIC cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51ca24f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51ca24f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51ca24f2
Branch: refs/heads/ignite-4259
Commit: 51ca24f2db32dff9c0034603ea3abfe5ef5cd846
Parents: 88f38ac
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Nov 21 16:48:44 2016 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Mon Nov 21 16:48:44 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheIoManager.java | 25 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 57 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 287 ++++++++
.../dht/atomic/GridDhtAtomicCache.java | 17 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 61 ++
.../GridDhtAtomicSingleUpdateRequest.java | 678 +++++++++++++++++++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 26 +
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 312 +++------
.../GridNearAtomicAbstractUpdateRequest.java | 5 +
.../atomic/GridNearAtomicFullUpdateRequest.java | 108 +--
.../GridNearAtomicSingleUpdateRequest.java | 5 +
.../distributed/near/GridNearAtomicCache.java | 8 +-
.../GridCacheAtomicMessageCountSelfTest.java | 6 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 2 +-
15 files changed, 1292 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b20de68..f36191c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,12 +67,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -774,7 +775,12 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124..127] - this
+ case -36:
+ msg = new GridDhtAtomicSingleUpdateRequest();
+
+ break;
+
+ // [-3..119] [124..127] [-36]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c5c1c60..924ce79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
@@ -470,8 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
- else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
- return ((GridDhtAtomicUpdateRequest)cacheMsg).futureVersion();
+ else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+ return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
@@ -486,8 +488,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
- else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
- return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion();
+ else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+ return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
return null;
}
@@ -791,6 +793,21 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case -36: {
+ GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg;
+
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+ ctx.cacheId(),
+ req.futureVersion(),
+ ctx.deploymentEnabled());
+
+ res.onError(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]", msg.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 3bbc348..7e4c4e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -80,7 +81,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
/** Update request. */
- private final GridNearAtomicAbstractUpdateRequest updateReq;
+ protected final GridNearAtomicAbstractUpdateRequest updateReq;
/** Update response. */
final GridNearAtomicUpdateResponse updateRes;
@@ -90,7 +91,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Mappings. */
@GridToStringInclude
- protected Map<UUID, GridDhtAtomicUpdateRequest> mappings;
+ protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
/** Continuous query closures. */
private Collection<CI1<Boolean>> cntQryClsrs;
@@ -188,23 +189,16 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
UUID nodeId = node.id();
if (!nodeId.equals(cctx.localNodeId())) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+ GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId);
if (updateReq == null) {
- updateReq = new GridDhtAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
+ updateReq = createRequest(
+ node,
futVer,
writeVer,
syncMode,
topVer,
- forceTransformBackups,
- this.updateReq.subjectId(),
- this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled(),
- this.updateReq.keepBinary(),
- this.updateReq.skipStore());
+ forceTransformBackups);
mappings.put(nodeId, updateReq);
}
@@ -256,7 +250,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
AffinityTopologyVersion topVer = updateReq.topologyVersion();
for (UUID nodeId : readers) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+ GridDhtAtomicAbstractUpdateRequest updateReq = mappings.get(nodeId);
if (updateReq == null) {
ClusterNode node = cctx.discovery().node(nodeId);
@@ -265,20 +259,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (node == null)
continue;
- updateReq = new GridDhtAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
+ updateReq = createRequest(
+ node,
futVer,
writeVer,
syncMode,
topVer,
- forceTransformBackups,
- this.updateReq.subjectId(),
- this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled(),
- this.updateReq.keepBinary(),
- this.updateReq.skipStore());
+ forceTransformBackups);
mappings.put(nodeId, updateReq);
}
@@ -336,7 +323,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
final boolean registerResponse(UUID nodeId) {
int resCnt0;
- GridDhtAtomicUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
+ GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
if (req != null) {
synchronized (this) {
@@ -365,7 +352,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
final void map() {
if (!F.isEmpty(mappings)) {
- for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
try {
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -412,6 +399,24 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
+ * @param node Node.
+ * @param futVer Future version.
+ * @param writeVer Update version.
+ * @param syncMode Write synchronization mode.
+ * @param topVer Topology version.
+ * @param forceTransformBackups Force transform backups flag.
+ * @return Request.
+ */
+ protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
+ ClusterNode node,
+ GridCacheVersion futVer,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean forceTransformBackups
+ );
+
+ /**
* Callback for backup update response.
*
* @param nodeId Backup node ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
new file mode 100644
index 0000000..f0bea07
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID. */
+ @GridDirectTransient
+ protected UUID nodeId;
+
+ /** On response flag. Access should be synced on future. */
+ @GridDirectTransient
+ private boolean onRes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ protected GridDhtAtomicAbstractUpdateRequest() {
+ // N-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ */
+ protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Keep binary flag.
+ */
+ public abstract boolean keepBinary();
+
+ /**
+ * @return Skip write-through to a persistent storage.
+ */
+ public abstract boolean skipStore();
+
+ /**
+ * @return {@code True} if on response flag changed.
+ */
+ public boolean onResponse() {
+ return !onRes && (onRes = true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /**
+ * @return Force transform backups flag.
+ */
+ public abstract boolean forceTransformBackups();
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+ return ctx.atomicMessageLogger();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ cleanup();
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Value, {@code null} if should be removed.
+ * @param entryProcessor Entry processor.
+ * @param ttl TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} adds previous value.
+ * @param partId Partition.
+ * @param prevVal Previous value.
+ * @param updateCntr Update counter.
+ */
+ public abstract void addWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ int partId,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr
+ );
+
+ /**
+ * @param key Key to add.
+ * @param val Value, {@code null} if should be removed.
+ * @param entryProcessor Entry processor.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ */
+ public abstract void addNearWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long expireTime);
+
+ /**
+ * Cleanup values not needed after message was sent.
+ */
+ protected abstract void cleanup();
+
+ /**
+ * @return Subject ID.
+ */
+ public abstract UUID subjectId();
+
+ /**
+ * @return Task name.
+ */
+ public abstract int taskNameHash();
+
+ /**
+ * @return Version assigned on primary node.
+ */
+ public abstract GridCacheVersion futureVersion();
+
+ /**
+ * @return Write version.
+ */
+ public abstract GridCacheVersion writeVersion();
+
+ /**
+ * @return Cache write synchronization mode.
+ */
+ public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+
+ /**
+ * @return Keys size.
+ */
+ public abstract int size();
+
+ /**
+ * @return Keys size.
+ */
+ public abstract int nearSize();
+
+ /**
+ * @param key Key to check.
+ * @return {@code true} if request keys contain key.
+ */
+ public abstract boolean hasKey(KeyCacheObject key);
+
+ /**
+ * @param idx Key index.
+ * @return Key.
+ */
+ public abstract KeyCacheObject key(int idx);
+
+ /**
+ * @param idx Partition index.
+ * @return Partition id.
+ */
+ public abstract int partitionId(int idx);
+
+ /**
+ * @param updCntr Update counter.
+ * @return Update counter.
+ */
+ public abstract Long updateCounter(int updCntr);
+
+ /**
+ * @param idx Near key index.
+ * @return Key.
+ */
+ public abstract KeyCacheObject nearKey(int idx);
+
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public abstract CacheObject value(int idx);
+
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ @Nullable public abstract CacheObject previousValue(int idx);
+
+ /**
+ * @param idx Key index.
+ * @return Entry processor.
+ */
+ @Nullable public abstract EntryProcessor<Object, Object, Object> entryProcessor(int idx);
+
+ /**
+ * @param idx Near key index.
+ * @return Value.
+ */
+ @Nullable public abstract CacheObject nearValue(int idx);
+
+ /**
+ * @param idx Key index.
+ * @return Transform closure.
+ */
+ @Nullable public abstract EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx);
+
+ /**
+ * @param idx Index.
+ * @return Conflict version.
+ */
+ @Nullable public abstract GridCacheVersion conflictVersion(int idx);
+
+ /**
+ * @param idx Index.
+ * @return TTL.
+ */
+ public abstract long ttl(int idx);
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ public abstract long nearTtl(int idx);
+
+ /**
+ * @param idx Index.
+ * @return Conflict expire time.
+ */
+ public abstract long conflictExpireTime(int idx);
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ public abstract long nearExpireTime(int idx);
+
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public abstract Object[] invokeArguments();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d7eb062..2a7055d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -360,11 +360,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().addHandler(
ctx.cacheId(),
- GridDhtAtomicUpdateRequest.class,
- new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ GridDhtAtomicAbstractUpdateRequest.class,
+ new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
@Override public void apply(
UUID nodeId,
- GridDhtAtomicUpdateRequest req
+ GridDhtAtomicAbstractUpdateRequest req
) {
processDhtAtomicUpdateRequest(
nodeId,
@@ -3100,12 +3100,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateFuture fut =
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
- if (fut != null) {
- if (fut instanceof GridNearAtomicSingleUpdateFuture)
- ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, false);
- else
- ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false);
- }
+ if (fut != null)
+ fut.onResult(nodeId, res, false);
+
else
U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
"[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
@@ -3115,7 +3112,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Dht atomic update request.
*/
- private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index f83a7b7..656caab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -30,6 +32,8 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.NotNull;
/**
*
@@ -38,6 +42,9 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
+
/** Future keys. */
private KeyCacheObject key;
@@ -87,6 +94,49 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
/** {@inheritDoc} */
+ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+ ClusterNode node,
+ GridCacheVersion futVer,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean forceTransformBackups
+ ) {
+ if (canUseSingleRequest(node)) {
+ assert !forceTransformBackups;
+
+ return new GridDhtAtomicSingleUpdateRequest(
+ cctx.cacheId(),
+ node.id(),
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ updateReq.subjectId(),
+ updateReq.taskNameHash(),
+ cctx.deploymentEnabled(),
+ updateReq.keepBinary(),
+ updateReq.skipStore());
+ }
+ else {
+ return new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ node.id(),
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ forceTransformBackups,
+ updateReq.subjectId(),
+ updateReq.taskNameHash(),
+ forceTransformBackups ? updateReq.invokeArguments() : null,
+ cctx.deploymentEnabled(),
+ updateReq.keepBinary(),
+ updateReq.skipStore());
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
if (log.isDebugEnabled())
log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
@@ -114,6 +164,17 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
updateRes.addFailedKey(key, err);
}
+ /**
+ * @param node Target node
+ * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest}
+ */
+ private boolean canUseSingleRequest(ClusterNode node) {
+ return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
+ cctx.expiry() == null &&
+ updateReq.expiry() == null &&
+ !updateReq.hasConflictData();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..a03d948
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdateRequest {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Near cache key flag. */
+ private static final int NEAR_FLAG_MASK = 0x80;
+
+ /** Future version. */
+ protected GridCacheVersion futVer;
+
+ /** Write version. */
+ protected GridCacheVersion writeVer;
+
+ /** Write synchronization mode. */
+ protected CacheWriteSynchronizationMode syncMode;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
+ /** Additional flags. */
+ protected byte flags;
+
+ /** Key to update. */
+ @GridToStringInclude
+ protected KeyCacheObject key;
+
+ /** Value to update. */
+ @GridToStringInclude
+ protected CacheObject val;
+
+ /** Previous value. */
+ @GridToStringInclude
+ protected CacheObject prevVal;
+
+ /** Partition. */
+ protected long updateCntr;
+
+ /** */
+ protected int partId;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDhtAtomicSingleUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param writeVer Write version for cache values.
+ * @param syncMode Cache write synchronization mode.
+ * @param topVer Topology version.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param addDepInfo Deployment info.
+ * @param keepBinary Keep binary flag.
+ * @param skipStore Skip store flag.
+ */
+ GridDhtAtomicSingleUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ UUID subjId,
+ int taskNameHash,
+ boolean addDepInfo,
+ boolean keepBinary,
+ boolean skipStore
+ ) {
+ super(cacheId, nodeId);
+ this.futVer = futVer;
+ this.writeVer = writeVer;
+ this.syncMode = syncMode;
+ this.topVer = topVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.addDepInfo = addDepInfo;
+
+ if (skipStore)
+ setFlag(true, SKIP_STORE_FLAG_MASK);
+ if (keepBinary)
+ setFlag(true, KEEP_BINARY_FLAG_MASK);
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Value, {@code null} if should be removed.
+ * @param entryProcessor Entry processor.
+ * @param ttl TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} adds previous value.
+ * @param partId Partition.
+ * @param prevVal Previous value.
+ * @param updateCntr Update counter.
+ */
+ @Override public void addWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ int partId,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr
+ ) {
+ assert entryProcessor == null;
+ assert ttl <= 0 : ttl;
+ assert conflictExpireTime <= 0 : conflictExpireTime;
+ assert conflictVer == null : conflictVer;
+
+ near(false);
+
+ this.key = key;
+ this.partId = partId;
+ this.val = val;
+
+ if (addPrevVal)
+ this.prevVal = prevVal;
+
+ if (updateCntr != null)
+ this.updateCntr = updateCntr;
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Value, {@code null} if should be removed.
+ * @param entryProcessor Entry processor.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ */
+ @Override public void addNearWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long expireTime) {
+ assert entryProcessor == null;
+ assert ttl <= 0 : ttl;
+
+ near(true);
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean forceTransformBackups() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return key != null ? near() ? 0 : 1 : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int nearSize() {
+ return key != null ? near() ? 1 : 0 : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasKey(KeyCacheObject key) {
+ return !near() && F.eq(this.key, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return isFlag(SKIP_STORE_FLAG_MASK);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject key(int idx) {
+ assert idx == 0 : idx;
+
+ return near() ? null : key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitionId(int idx) {
+ assert idx == 0 : idx;
+
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long updateCounter(int updCntr) {
+ assert updCntr == 0 : updCntr;
+
+ return updateCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject nearKey(int idx) {
+ assert idx == 0 : idx;
+
+ return near() ? key : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject value(int idx) {
+ assert idx == 0 : idx;
+
+ return near() ? null : val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion writeVersion() {
+ return writeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
+ return subjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject previousValue(int idx) {
+ assert idx == 0 : idx;
+
+ return prevVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject nearValue(int idx) {
+ assert idx == 0 : idx;
+
+ return near() ? val : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ assert idx == 0 : idx;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
+ assert idx == 0 : idx;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+ assert idx == 0 : idx;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long ttl(int idx) {
+ assert idx == 0 : idx;
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long nearTtl(int idx) {
+ assert idx == 0 : idx;
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictExpireTime(int idx) {
+ assert idx == 0 : idx;
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long nearExpireTime(int idx) {
+ assert idx == 0 : idx;
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return isFlag(KEEP_BINARY_FLAG_MASK);
+ }
+
+ /**
+ *
+ */
+ private boolean near() {
+ return isFlag(NEAR_FLAG_MASK);
+ }
+
+ /**
+ *
+ */
+ private void near(boolean near) {
+ setFlag(near, NEAR_FLAG_MASK);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalObject(key, cctx);
+
+ prepareMarshalObject(val, cctx);
+
+ prepareMarshalObject(prevVal, cctx);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalObject(key, cctx, ldr);
+
+ finishUnmarshalObject(val, cctx, ldr);
+
+ finishUnmarshalObject(prevVal, cctx, ldr);
+
+ key.partition(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("key", key))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("prevVal", prevVal))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeLong("updateCntr", updateCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeMessage("val", val))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ key = reader.readMessage("key");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ prevVal = reader.readMessage("prevVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 10:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ updateCntr = reader.readLong("updateCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ val = reader.readMessage("val");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ writeVer = reader.readMessage("writeVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class);
+ }
+
+ /**
+ * @param obj CacheObject to marshal
+ * @param ctx context
+ * @throws IgniteCheckedException if error
+ */
+ private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+ if (obj != null)
+ obj.prepareMarshal(ctx.cacheObjectContext());
+ }
+
+ /**
+ * @param obj CacheObject un to marshal
+ * @param ctx context
+ * @param ldr class loader
+ * @throws IgniteCheckedException if error
+ */
+ private void finishUnmarshalObject(@Nullable CacheObject obj, GridCacheContext ctx,
+ ClassLoader ldr) throws IgniteCheckedException {
+ if (obj != null)
+ obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+ }
+
+ /**
+ * Cleanup values not needed after message was sent.
+ */
+ @Override protected void cleanup() {
+ val = null;
+ prevVal = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -36;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 15;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 864aadd..dd1f1c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
/**
* DHT atomic cache backup update future.
@@ -118,6 +121,29 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
/** {@inheritDoc} */
+ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
+ GridCacheVersion futVer,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean forceTransformBackups) {
+ return new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ node.id(),
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ forceTransformBackups,
+ updateReq.subjectId(),
+ updateReq.taskNameHash(),
+ forceTransformBackups ? updateReq.invokeArguments() : null,
+ cctx.deploymentEnabled(),
+ updateReq.keepBinary(),
+ updateReq.skipStore());
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 55f7560..f2fbb0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,25 +20,22 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -52,17 +49,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
/**
* Lite dht cache backup update request.
*/
-public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateRequest {
/** */
private static final long serialVersionUID = 0L;
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
/** Future version. */
private GridCacheVersion futVer;
@@ -151,10 +141,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Partition. */
private GridLongList updateCntrs;
- /** On response flag. Access should be synced on future. */
- @GridDirectTransient
- private boolean onRes;
-
/** */
@GridDirectTransient
private List<Integer> partIds;
@@ -162,9 +148,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Keep binary flag. */
private boolean keepBinary;
- /**
- * Additional flags.
- */
+ /** Additional flags. */
private byte flags;
/**
@@ -204,10 +188,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
boolean keepBinary,
boolean skipStore
) {
- assert invokeArgs == null || forceTransformBackups;
+ super(cacheId, nodeId);
- this.cacheId = cacheId;
- this.nodeId = nodeId;
this.futVer = futVer;
this.writeVer = writeVer;
this.syncMode = syncMode;
@@ -215,12 +197,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
this.forceTransformBackups = forceTransformBackups;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
+
+ assert invokeArgs == null || forceTransformBackups;
+
this.invokeArgs = invokeArgs;
this.addDepInfo = addDepInfo;
this.keepBinary = keepBinary;
- if (skipStore)
- flags = (byte)(flags | SKIP_STORE_FLAG_MASK);
+ setFlag(skipStore, SKIP_STORE_FLAG_MASK);
keys = new ArrayList<>();
partIds = new ArrayList<>();
@@ -233,26 +217,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
vals = new ArrayList<>();
}
- /**
- * @return Force transform backups flag.
- */
- public boolean forceTransformBackups() {
- return forceTransformBackups;
- }
-
- /**
- * @param key Key to add.
- * @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
- * @param ttl TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- * @param addPrevVal If {@code true} adds previous value.
- * @param partId Partition.
- * @param prevVal Previous value.
- * @param updateCntr Update counter.
- */
- public void addWriteValue(KeyCacheObject key,
+ /** {@inheritDoc} */
+ @Override public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
@@ -328,14 +294,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
conflictExpireTimes.add(conflictExpireTime);
}
- /**
- * @param key Key to add.
- * @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
- * @param ttl TTL.
- * @param expireTime Expire time.
- */
- public void addNearWriteValue(KeyCacheObject key,
+ /** {@inheritDoc} */
+ @Override public void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
@@ -387,183 +347,114 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
+ @Override public boolean forceTransformBackups() {
+ return forceTransformBackups;
}
- /**
- * @return Subject ID.
- */
- public UUID subjectId() {
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
return subjId;
}
- /**
- * @return Task name.
- */
- public int taskNameHash() {
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
return taskNameHash;
}
- /**
- * @return Keys size.
- */
- public int size() {
- return keys.size();
- }
-
- /**
- * @return Keys size.
- */
- public int nearSize() {
- return nearKeys != null ? nearKeys.size() : 0;
- }
-
- /**
- * @return Version assigned on primary node.
- */
- public GridCacheVersion futureVersion() {
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
return futVer;
}
- /**
- * @return Write version.
- */
- public GridCacheVersion writeVersion() {
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion writeVersion() {
return writeVer;
}
- /**
- * @return Cache write synchronization mode.
- */
- public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ /** {@inheritDoc} */
+ @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
return syncMode;
}
- /**
- * @return Topology version.
- */
+ /** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
- /**
- * @return Keys.
- */
- public Collection<KeyCacheObject> keys() {
- return keys;
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return keys.size();
}
- /**
- * @param idx Key index.
- * @return Key.
- */
- public KeyCacheObject key(int idx) {
- return keys.get(idx);
+ /** {@inheritDoc} */
+ @Override public int nearSize() {
+ return nearKeys != null ? nearKeys.size() : 0;
}
- /**
- * @param idx Partition index.
- * @return Partition id.
- */
- public int partitionId(int idx) {
- return partIds.get(idx);
+ /** {@inheritDoc} */
+ @Override public boolean hasKey(KeyCacheObject key) {
+ return F.contains(keys, key);
}
- /**
- * @return Skip write-through to a persistent storage.
- */
- public boolean skipStore() {
- return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK;
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject key(int idx) {
+ return keys.get(idx);
}
- /**
- * @param updCntr Update counter.
- * @return Update counter.
- */
- public Long updateCounter(int updCntr) {
+ /** {@inheritDoc} */
+ @Override public int partitionId(int idx) {
+ return partIds.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long updateCounter(int updCntr) {
if (updateCntrs != null && updCntr < updateCntrs.size())
return updateCntrs.get(updCntr);
return null;
}
- /**
- * @param idx Near key index.
- * @return Key.
- */
- public KeyCacheObject nearKey(int idx) {
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject nearKey(int idx) {
return nearKeys.get(idx);
}
- /**
- * @return Keep binary flag.
- */
- public boolean keepBinary() {
- return keepBinary;
- }
-
- /**
- * @param idx Key index.
- * @return Value.
- */
- @Nullable public CacheObject value(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject value(int idx) {
if (vals != null)
return vals.get(idx);
return null;
}
- /**
- * @param idx Key index.
- * @return Value.
- */
- @Nullable public CacheObject previousValue(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject previousValue(int idx) {
if (prevVals != null)
return prevVals.get(idx);
return null;
}
- /**
- * @param idx Key index.
- * @return Entry processor.
- */
- @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
return entryProcessors == null ? null : entryProcessors.get(idx);
}
- /**
- * @param idx Near key index.
- * @return Value.
- */
- @Nullable public CacheObject nearValue(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject nearValue(int idx) {
if (nearVals != null)
return nearVals.get(idx);
return null;
}
- /**
- * @param idx Key index.
- * @return Transform closure.
- */
- @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
}
- /**
- * @param idx Index.
- * @return Conflict version.
- */
- @Nullable public GridCacheVersion conflictVersion(int idx) {
+ /** {@inheritDoc} */
+ @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
if (conflictVers != null) {
assert idx >= 0 && idx < conflictVers.size();
@@ -573,11 +464,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return null;
}
- /**
- * @param idx Index.
- * @return TTL.
- */
- public long ttl(int idx) {
+ /** {@inheritDoc} */
+ @Override public long ttl(int idx) {
if (ttls != null) {
assert idx >= 0 && idx < ttls.size();
@@ -587,11 +475,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return CU.TTL_NOT_CHANGED;
}
- /**
- * @param idx Index.
- * @return TTL for near cache update.
- */
- public long nearTtl(int idx) {
+ /** {@inheritDoc} */
+ @Override public long nearTtl(int idx) {
if (nearTtls != null) {
assert idx >= 0 && idx < nearTtls.size();
@@ -601,11 +486,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return CU.TTL_NOT_CHANGED;
}
- /**
- * @param idx Index.
- * @return Conflict expire time.
- */
- public long conflictExpireTime(int idx) {
+ /** {@inheritDoc} */
+ @Override public long conflictExpireTime(int idx) {
if (conflictExpireTimes != null) {
assert idx >= 0 && idx < conflictExpireTimes.size();
@@ -615,11 +497,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return CU.EXPIRE_TIME_CALCULATE;
}
- /**
- * @param idx Index.
- * @return Expire time for near cache update.
- */
- public long nearExpireTime(int idx) {
+ /** {@inheritDoc} */
+ @Override public long nearExpireTime(int idx) {
if (nearExpireTimes != null) {
assert idx >= 0 && idx < nearExpireTimes.size();
@@ -629,17 +508,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return CU.EXPIRE_TIME_CALCULATE;
}
- /**
- * @return {@code True} if on response flag changed.
- */
- public boolean onResponse() {
- return !onRes && (onRes = true);
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
}
- /**
- * @return Optional arguments for entry processor.
- */
- @Nullable public Object[] invokeArguments() {
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return isFlag(SKIP_STORE_FLAG_MASK);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
return invokeArgs;
}
@@ -711,16 +591,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
- }
-
- /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -1083,14 +953,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/** {@inheritDoc} */
- @Override public void onAckReceived() {
- cleanup();
- }
-
- /**
- * Cleanup values not needed after message was sent.
- */
- private void cleanup() {
+ @Override protected void cleanup() {
nearVals = null;
prevVals = null;
}
@@ -1105,6 +968,27 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
return 26;
}
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..bae9e3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -223,4 +223,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @return Key.
*/
public abstract KeyCacheObject key(int idx);
+
+ /**
+ * @return {@code True} if request does not have conflict data.
+ */
+ public abstract boolean hasConflictData();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index b733d7b..c785828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,24 +69,49 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** Future version. */
private GridCacheVersion futVer;
- /** Fast map flag. */
- private boolean fastMap;
-
/** Update version. Set to non-null if fastMap is {@code true}. */
private GridCacheVersion updateVer;
/** Topology version. */
private AffinityTopologyVersion topVer;
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private boolean topLocked;
-
/** Write synchronization mode. */
private CacheWriteSynchronizationMode syncMode;
/** Update operation. */
private GridCacheOperation op;
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
+ /** Fast map flag. */
+ protected boolean fastMap;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ protected boolean topLocked;
+
+ /** Flag indicating whether request contains primary keys. */
+ protected boolean hasPrimary;
+
+ /** Skip write-through to a persistent storage. */
+ protected boolean skipStore;
+
+ /** */
+ protected boolean clientReq;
+
+ /** Keep binary flag. */
+ protected boolean keepBinary;
+
+ /** Return value flag. */
+ protected boolean retval;
+
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -107,13 +133,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectCollection(byte[].class)
private List<byte[]> entryProcessorsBytes;
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
/** Conflict versions. */
@GridDirectCollection(GridCacheVersion.class)
private List<GridCacheVersion> conflictVers;
@@ -124,8 +143,12 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** Conflict expire times. */
private GridLongList conflictExpireTimes;
- /** Return value flag. */
- private boolean retval;
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
/** Expiry policy. */
@GridDirectTransient
@@ -137,28 +160,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** Filter. */
private CacheEntryPredicate[] filter;
- /** Flag indicating whether request contains primary keys. */
- private boolean hasPrimary;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Skip write-through to a persistent storage. */
- private boolean skipStore;
-
- /** */
- private boolean clientReq;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
/** Maximum possible size of inner collections. */
@GridDirectTransient
private int initSize;
@@ -523,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Nullable @Override public CacheEntryPredicate[] filter() {
+ @Override @Nullable public CacheEntryPredicate[] filter() {
return filter;
}
@@ -533,11 +534,19 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public boolean hasConflictData() {
+ return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null;
+ }
+
+ /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
GridCacheContext cctx = ctx.cacheContext(cacheId);
+ if (expiryPlc != null && expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
prepareMarshalCacheObjects(keys, cctx);
if (filter != null) {
@@ -555,9 +564,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
filter = null;
}
- if (expiryPlc != null && expiryPlcBytes == null)
- expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
-
if (op == TRANSFORM) {
// force addition of deployment info for entry processors if P2P is enabled globally.
if (!addDepInfo && ctx.deploymentEnabled())
@@ -579,8 +585,18 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
GridCacheContext cctx = ctx.cacheContext(cacheId);
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
finishUnmarshalCacheObjects(keys, cctx, ldr);
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter) {
+ if (p != null)
+ p.finishUnmarshal(cctx, ldr);
+ }
+ }
+
if (op == TRANSFORM) {
if (entryProcessors == null)
entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
@@ -591,16 +607,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
- if (filter != null) {
- for (CacheEntryPredicate p : filter) {
- if (p != null)
- p.finishUnmarshal(cctx, ldr);
- }
- }
-
- if (expiryPlcBytes != null && expiryPlc == null)
- expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
if (partIds != null && !partIds.isEmpty()) {
assert partIds.size() == keys.size();
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 211b472..f3b9726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -226,6 +226,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
return CU.EXPIRE_TIME_CALCULATE;
}
+ /** {@inheritDoc} */
+ @Override public boolean hasConflictData() {
+ return false;
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index b5b2c72..a8219b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -41,8 +41,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
@@ -302,15 +302,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
*/
public void processDhtAtomicUpdateRequest(
UUID nodeId,
- GridDhtAtomicUpdateRequest req,
+ GridDhtAtomicAbstractUpdateRequest req,
GridDhtAtomicUpdateResponse res
) {
GridCacheVersion ver = req.writeVersion();
assert ver != null;
- Collection<KeyCacheObject> backupKeys = req.keys();
-
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
@@ -329,7 +327,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
break;
}
- if (F.contains(backupKeys, key)) { // Reader became backup.
+ if (req.hasKey(key)) { // Reader became backup.
if (entry.markObsolete(ver))
removeEntry(entry);
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index a6d612a..e8c5db1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -141,6 +142,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
+ commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class);
int putCnt = 15;
@@ -171,7 +173,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
assertEquals(expNearCnt, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
assertEquals(expNearSingleCnt, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
- assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+ assertEquals(expDhtCnt, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
if (writeOrderMode == CLOCK) {
for (int i = 1; i < 4; i++) {
@@ -179,7 +181,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
assertEquals(0, commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
assertEquals(0, commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
- assertEquals(0, commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+ assertEquals(0, commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 0899423..644e310 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,7 +478,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
Object origMsg = msg.message();
return delay &&
- ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicUpdateRequest));
+ ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicAbstractUpdateRequest));
}
}
}
\ No newline at end of file
[05/10] ignite git commit: IGNITE-3075 Fixed condition for 'single'
request creation
Posted by vo...@apache.org.
IGNITE-3075 Fixed condition for 'single' request creation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d32fa21b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d32fa21b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d32fa21b
Branch: refs/heads/ignite-4259
Commit: d32fa21b673814b060d2362f06ff44838e9c2cdc
Parents: f2dc1d7
Author: sboikov <sb...@gridgain.com>
Authored: Tue Nov 22 11:33:55 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Nov 22 11:33:55 2016 +0300
----------------------------------------------------------------------
.../GridDhtAtomicAbstractUpdateFuture.java | 19 ++++++++-----
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 30 +++++++++++++-------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 10 +++++--
.../GridNearAtomicAbstractUpdateRequest.java | 5 ----
.../atomic/GridNearAtomicFullUpdateRequest.java | 5 ----
.../GridNearAtomicSingleUpdateRequest.java | 9 ------
...CacheLoadingConcurrentGridStartSelfTest.java | 6 +++-
8 files changed, 44 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 7e4c4e0..361fbe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -86,9 +86,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Update response. */
final GridNearAtomicUpdateResponse updateRes;
- /** Force transform backup flag. */
- private boolean forceTransformBackups;
-
/** Mappings. */
@GridToStringInclude
protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -198,7 +195,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
writeVer,
syncMode,
topVer,
- forceTransformBackups);
+ ttl,
+ conflictExpireTime,
+ conflictVer);
mappings.put(nodeId, updateReq);
}
@@ -265,7 +264,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
writeVer,
syncMode,
topVer,
- forceTransformBackups);
+ ttl,
+ expireTime,
+ null);
mappings.put(nodeId, updateReq);
}
@@ -404,7 +405,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
* @param topVer Topology version.
- * @param forceTransformBackups Force transform backups flag.
+ * @param ttl TTL.
+ * @param conflictExpireTime Conflict expire time.
+ * @param conflictVer Conflict version.
* @return Request.
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
@@ -413,7 +416,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
- boolean forceTransformBackups
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer
);
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2a7055d..940c74e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2369,7 +2369,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
+ boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.partition(),
req.topologyVersion());
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 656caab..20d6e90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -30,10 +30,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -100,11 +102,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
- boolean forceTransformBackups
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer
) {
- if (canUseSingleRequest(node)) {
- assert !forceTransformBackups;
-
+ if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
return new GridDhtAtomicSingleUpdateRequest(
cctx.cacheId(),
node.id(),
@@ -126,10 +128,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
writeVer,
syncMode,
topVer,
- forceTransformBackups,
+ false,
updateReq.subjectId(),
updateReq.taskNameHash(),
- forceTransformBackups ? updateReq.invokeArguments() : null,
+ null,
cctx.deploymentEnabled(),
updateReq.keepBinary(),
updateReq.skipStore());
@@ -166,13 +168,19 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/**
* @param node Target node
- * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest}
+ * @param ttl TTL.
+ * @param conflictExpireTime Conflict expire time.
+ * @param conflictVer Conflict version.
+ * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
*/
- private boolean canUseSingleRequest(ClusterNode node) {
+ private boolean canUseSingleRequest(ClusterNode node,
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer) {
return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
- cctx.expiry() == null &&
- updateReq.expiry() == null &&
- !updateReq.hasConflictData();
+ (ttl == CU.TTL_NOT_CHANGED) &&
+ (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
+ conflictVer == null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index dd1f1c4..efb35c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* DHT atomic cache backup update future.
@@ -126,7 +127,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
- boolean forceTransformBackups) {
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer
+ ) {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
node.id(),
@@ -134,10 +138,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
writeVer,
syncMode,
topVer,
- forceTransformBackups,
+ false,
updateReq.subjectId(),
updateReq.taskNameHash(),
- forceTransformBackups ? updateReq.invokeArguments() : null,
+ null,
cctx.deploymentEnabled(),
updateReq.keepBinary(),
updateReq.skipStore());
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bae9e3a..bee2ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -223,9 +223,4 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @return Key.
*/
public abstract KeyCacheObject key(int idx);
-
- /**
- * @return {@code True} if request does not have conflict data.
- */
- public abstract boolean hasConflictData();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c785828..1b11688 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -534,11 +534,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public boolean hasConflictData() {
- return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null;
- }
-
- /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index f3b9726..1c1addd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -227,15 +227,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
}
/** {@inheritDoc} */
- @Override public boolean hasConflictData() {
- return false;
- }
-
- /**
- * {@inheritDoc}
- *
- * @param ctx
- */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
index 0801691..ce64e1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -77,6 +78,8 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
@@ -95,7 +98,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
else
cfg.setCacheConfiguration(ccfg);
- if (!configured)
+ if (!configured) {
ccfg.setNodeFilter(new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
String name = node.attribute(ATTR_GRID_NAME).toString();
@@ -103,6 +106,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
return !getTestGridName(0).equals(name);
}
});
+ }
return cfg;
}
[03/10] ignite git commit: Merge remote-tracking branch
'community/ignite-1.7.4' into ignite-1.7.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'community/ignite-1.7.4' into ignite-1.7.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/551f90db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/551f90db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/551f90db
Branch: refs/heads/ignite-4259
Commit: 551f90dbeebcad35a0e3aac07229fb67578f2ab7
Parents: 6e4a279 51ca24f
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Nov 21 19:16:49 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Mon Nov 21 19:16:49 2016 +0500
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheIoManager.java | 25 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 57 +-
.../GridDhtAtomicAbstractUpdateRequest.java | 287 ++++++++
.../dht/atomic/GridDhtAtomicCache.java | 17 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 61 ++
.../GridDhtAtomicSingleUpdateRequest.java | 678 +++++++++++++++++++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 26 +
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 312 +++------
.../GridNearAtomicAbstractUpdateRequest.java | 5 +
.../atomic/GridNearAtomicFullUpdateRequest.java | 108 +--
.../GridNearAtomicSingleUpdateRequest.java | 5 +
.../distributed/near/GridNearAtomicCache.java | 8 +-
.../GridCacheAtomicMessageCountSelfTest.java | 6 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 2 +-
15 files changed, 1292 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
[10/10] ignite git commit: Fixed startup issue.
Posted by vo...@apache.org.
Fixed startup issue.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f789e2dd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f789e2dd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f789e2dd
Branch: refs/heads/ignite-4259
Commit: f789e2dd57706659a3d0f1afa333ab2ab4ca69e3
Parents: fc9ee6a
Author: devozerov <vo...@gridgain.com>
Authored: Tue Nov 22 13:48:23 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 13:48:23 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/query/GridQueryProcessor.java | 4 ++--
.../processors/query/h2/GridH2IndexingGeoSelfTest.java | 7 +++++++
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f789e2dd/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8befa0e..a372891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -256,12 +256,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (binaryEnabled && !keyOrValMustDeserialize) {
// Safe to check null.
- if (SQL_TYPES.contains(valCls))
+ if (SQL_TYPES.contains(valCls) || isGeometryClass(valCls))
desc.valueClass(valCls);
else
desc.valueClass(Object.class);
- if (SQL_TYPES.contains(keyCls))
+ if (SQL_TYPES.contains(keyCls) || isGeometryClass(keyCls))
desc.keyClass(keyCls);
else
desc.keyClass(Object.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f789e2dd/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
index b73a5c0..90370b4 100644
--- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
+++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java
@@ -35,15 +35,22 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.config.GridTestProperties;
/**
*
*/
public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest {
+ // TODO: To be removed.
+ static {
+ GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
+ }
+
/** */
private static final int CNT = 100;
[06/10] ignite git commit: IGNITE-4225 DataStreamer can hang on
changing topology
Posted by vo...@apache.org.
IGNITE-4225 DataStreamer can hang on changing topology
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d15eba4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d15eba4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d15eba4b
Branch: refs/heads/ignite-4259
Commit: d15eba4becf7515b512c1032b193ce75e1589177
Parents: d32fa21
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 11:56:20 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 11:56:20 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/datastreamer/DataStreamerImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d15eba4b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 443783b..bb9ffdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1919,7 +1919,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
AffinityTopologyVersion topVer = cctx.isLocal() ?
cctx.affinity().affinityTopologyVersion() :
- cctx.topology().topologyVersion();
+ cctx.shared().exchange().readyAffinityVersion();
GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
[09/10] ignite git commit: Merge remote-tracking branch
'upstream/ignite-1.7.4' into ignite-1.7.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'upstream/ignite-1.7.4' into ignite-1.7.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc9ee6a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc9ee6a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc9ee6a7
Branch: refs/heads/ignite-4259
Commit: fc9ee6a74fe0bf413ab0643d2776a1a43e6dd5d2
Parents: bc695f8 f80bfbd
Author: devozerov <vo...@gridgain.com>
Authored: Tue Nov 22 12:05:32 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 12:05:32 2016 +0300
----------------------------------------------------------------------
.../GridDhtAtomicAbstractUpdateFuture.java | 19 ++++++++-----
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 30 +++++++++++++-------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 10 +++++--
.../GridNearAtomicAbstractUpdateRequest.java | 5 ----
.../atomic/GridNearAtomicFullUpdateRequest.java | 5 ----
.../GridNearAtomicSingleUpdateRequest.java | 9 ------
.../dht/preloader/GridDhtPreloader.java | 18 +++++++++++-
.../datastreamer/DataStreamerImpl.java | 2 +-
.../ignite/internal/util/GridLogThrottle.java | 28 ++++++++++++++----
...CacheLoadingConcurrentGridStartSelfTest.java | 6 +++-
11 files changed, 84 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
[04/10] ignite git commit: Merged ignite-1.6.11 into ignite-1.7.4.
Posted by vo...@apache.org.
Merged ignite-1.6.11 into ignite-1.7.4.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f2dc1d71
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f2dc1d71
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f2dc1d71
Branch: refs/heads/ignite-4259
Commit: f2dc1d71705b86428a04a69c4f2d4ee3a82ed1bd
Parents: 551f90d 6e36a79
Author: sboikov <sb...@gridgain.com>
Authored: Mon Nov 21 18:12:27 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Nov 21 18:12:27 2016 +0300
----------------------------------------------------------------------
bin/ignite.bat | 5 +
.../org/apache/ignite/IgniteDataStreamer.java | 2 +-
.../apache/ignite/IgniteSystemProperties.java | 13 +
.../internal/ComputeTaskInternalFuture.java | 11 +
.../internal/binary/BinaryClassDescriptor.java | 37 +-
.../ignite/internal/binary/BinaryContext.java | 13 +-
.../internal/binary/BinaryObjectExImpl.java | 57 +-
.../internal/binary/BinaryObjectImpl.java | 23 +
.../binary/BinaryObjectOffheapImpl.java | 24 +-
.../ignite/internal/binary/BinaryUtils.java | 4 +
.../binary/builder/BinaryObjectBuilderImpl.java | 6 +-
.../processors/affinity/AffinityAssignment.java | 88 +++
.../affinity/GridAffinityAssignment.java | 8 +-
.../affinity/GridAffinityAssignmentCache.java | 35 +-
.../affinity/GridAffinityProcessor.java | 89 ++-
.../processors/affinity/GridAffinityUtils.java | 8 +-
.../affinity/HistoryAffinityAssignment.java | 169 ++++++
.../cache/CacheAffinitySharedManager.java | 57 +-
.../cache/DynamicCacheChangeBatch.java | 7 +
.../cache/GridCacheAffinityManager.java | 6 +-
.../processors/cache/GridCacheContext.java | 8 +
.../processors/cache/GridCacheMapEntry.java | 5 +-
.../processors/cache/GridCacheMvccManager.java | 77 +++
.../GridCachePartitionExchangeManager.java | 299 +++++++--
.../processors/cache/GridCacheProcessor.java | 5 +-
.../cache/GridCacheSharedContext.java | 1 +
.../processors/cache/GridCacheUtils.java | 67 --
.../dht/GridClientPartitionTopology.java | 33 +-
.../dht/GridDhtPartitionTopology.java | 3 +-
.../dht/GridDhtPartitionTopologyImpl.java | 31 +-
.../dht/preloader/GridDhtPartitionFullMap.java | 18 +
.../dht/preloader/GridDhtPartitionMap2.java | 53 +-
.../GridDhtPartitionsAbstractMessage.java | 40 +-
.../GridDhtPartitionsExchangeFuture.java | 84 +--
.../preloader/GridDhtPartitionsFullMessage.java | 150 ++++-
.../GridDhtPartitionsSingleMessage.java | 132 +++-
.../GridDhtPartitionsSingleRequest.java | 4 +-
.../dht/preloader/GridDhtPreloader.java | 4 +-
.../query/GridCacheQueryMetricsAdapter.java | 2 +-
.../continuous/GridContinuousProcessor.java | 4 +-
.../datastreamer/DataStreamProcessor.java | 104 +++-
.../datastreamer/DataStreamerImpl.java | 607 ++++++++++++++-----
.../internal/processors/igfs/IgfsProcessor.java | 15 +
.../ignite/internal/util/GridLogThrottle.java | 29 +-
.../ignite/internal/util/IgniteUtils.java | 111 +++-
.../internal/util/future/GridFutureAdapter.java | 12 +-
.../util/offheap/unsafe/GridUnsafeMemory.java | 33 +-
.../visor/misc/VisorResolveHostNameTask.java | 4 +-
.../security/SecurityBasicPermissionSet.java | 107 ++++
.../security/SecurityPermissionSetBuilder.java | 222 +++++++
.../ignite/spi/discovery/tcp/ClientImpl.java | 38 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 510 +++++++++++++---
.../spi/discovery/tcp/TcpDiscoveryImpl.java | 59 +-
.../messages/TcpDiscoveryAbstractMessage.java | 7 +
.../messages/TcpDiscoveryClientAckResponse.java | 5 +
.../TcpDiscoveryClientHeartbeatMessage.java | 7 +-
.../TcpDiscoveryConnectionCheckMessage.java | 5 +
.../messages/TcpDiscoveryHeartbeatMessage.java | 5 +
.../TcpDiscoveryNodeAddFinishedMessage.java | 11 +
.../messages/TcpDiscoveryNodeAddedMessage.java | 33 +-
.../binary/BinaryMarshallerSelfTest.java | 343 ++++++++++-
...CacheExchangeMessageDuplicatedStateTest.java | 393 ++++++++++++
.../cache/IgniteCacheDynamicStopSelfTest.java | 48 +-
.../cache/IgniteCachePeekModesAbstractTest.java | 2 +-
...CacheLoadingConcurrentGridStartSelfTest.java | 251 +++++++-
...ncurrentGridStartSelfTestAllowOverwrite.java | 30 +
.../distributed/IgniteCacheGetRestartTest.java | 3 +
...cingDelayedPartitionMapExchangeSelfTest.java | 8 +-
.../GridCacheRebalancingSyncSelfTest.java | 18 +-
.../GridCacheSyncReplicatedPreloadSelfTest.java | 3 -
.../IgniteCacheSyncRebalanceModeSelfTest.java | 2 +-
...ContinuousQueryFailoverAbstractSelfTest.java | 2 +-
...ComputeJobExecutionErrorToLogManualTest.java | 88 +++
.../IgniteNoCustomEventsOnNodeStart.java | 7 +
.../DataStreamProcessorSelfTest.java | 4 +-
.../datastreamer/DataStreamerImplSelfTest.java | 170 ++++--
.../DataStreamerMultiThreadedSelfTest.java | 2 -
.../datastreamer/DataStreamerTimeoutTest.java | 92 ++-
.../igfs/IgfsProcessorValidationSelfTest.java | 30 +
...IpcEndpointRegistrationAbstractSelfTest.java | 76 ++-
...dpointRegistrationOnLinuxAndMacSelfTest.java | 11 +-
.../SecurityPermissionSetBuilderTest.java | 110 ++++
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 +++-
.../junits/common/GridCommonAbstractTest.java | 25 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 3 +
.../testsuites/IgniteCacheTestSuite2.java | 5 +
...opClientProtocolMultipleServersSelfTest.java | 102 ++--
.../query/h2/GridH2ResultSetIterator.java | 62 +-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../query/h2/opt/GridH2ValueCacheObject.java | 10 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 34 +-
.../query/h2/twostep/GridMergeIndex.java | 49 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../cache/CacheSqlQueryValueCopySelfTest.java | 226 +++++++
.../cache/IgniteCacheOffheapEvictQueryTest.java | 7 +
.../IgniteCacheQueryMultiThreadedSelfTest.java | 59 ++
...lientQueryReplicatedNodeRestartSelfTest.java | 7 +
...butedQueryStopOnCancelOrTimeoutSelfTest.java | 7 +
.../query/IgniteSqlSplitterSelfTest.java | 2 +
.../query/h2/sql/GridQueryParsingTest.java | 11 +-
.../IgniteCacheQuerySelfTestSuite2.java | 2 +
modules/platforms/cpp/DEVNOTES.txt | 22 +-
modules/platforms/cpp/README.txt | 10 +-
modules/platforms/cpp/binary/Makefile.am | 4 +-
.../cpp/binary/project/vs/binary.vcxproj | 2 -
.../cpp/common/project/vs/common.vcxproj | 1 -
modules/platforms/cpp/core/Makefile.am | 4 +-
.../platforms/cpp/core/project/vs/core.vcxproj | 2 -
modules/platforms/cpp/examples/README.txt | 9 +-
.../cpp/examples/odbc-example/Makefile.am | 4 +-
.../cpp/examples/putget-example/Makefile.am | 4 +-
.../cpp/examples/query-example/Makefile.am | 4 +-
modules/platforms/cpp/ignite/Makefile.am | 4 +-
.../cpp/ignite/project/vs/ignite.vcxproj | 4 +-
modules/platforms/cpp/jni/Makefile.am | 4 +-
.../platforms/cpp/jni/project/vs/jni.vcxproj | 1 -
.../platforms/cpp/odbc-test/include/test_type.h | 42 +-
.../cpp/odbc-test/src/api_robustness_test.cpp | 63 ++
.../cpp/odbc-test/src/queries_test.cpp | 153 ++++-
.../cpp/odbc-test/src/sql_outer_join_test.cpp | 2 +-
modules/platforms/cpp/odbc/Makefile.am | 4 +-
modules/platforms/cpp/odbc/README.txt | 23 +-
.../cpp/odbc/include/ignite/odbc/statement.h | 42 ++
.../cpp/odbc/install/ignite-odbc-amd64.wxs | 114 ++++
.../cpp/odbc/install/ignite-odbc-x86.wxs | 114 ++++
.../platforms/cpp/odbc/project/vs/odbc.vcxproj | 4 +-
.../odbc/src/app/application_data_buffer.cpp | 34 +-
modules/platforms/cpp/odbc/src/odbc.cpp | 116 +---
modules/platforms/cpp/odbc/src/statement.cpp | 151 +++++
.../Dataload/DataStreamerTestTopologyChange.cs | 27 +-
.../Apache.Ignite.Core/Impl/PlatformTarget.cs | 2 +-
.../src/test/config/incorrect-store-cache.xml | 2 +
.../src/test/config/jdbc-pojo-store-builtin.xml | 3 +
.../src/test/config/jdbc-pojo-store-obj.xml | 3 +
modules/spring/src/test/config/node.xml | 2 +
modules/spring/src/test/config/node1.xml | 2 +
.../test/config/pojo-incorrect-store-cache.xml | 2 +
modules/spring/src/test/config/store-cache.xml | 2 +
modules/spring/src/test/config/store-cache1.xml | 2 +
.../IgniteStartFromStreamConfigurationTest.java | 18 +-
pom.xml | 8 +
141 files changed, 5973 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 558a654,f3751ac..ab573bd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -1475,10 -1507,25 +1478,25 @@@ class GridDhtPartitionTopologyImpl impl
lock.readLock().lock();
try {
- Map<Integer, Long> res = new HashMap<>(cntrMap);
+ Map<Integer, Long> res;
+
+ if (skipZeros) {
+ res = U.newHashMap(cntrMap.size());
+
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = e.getValue();
+
+ if (ZERO.equals(cntr))
+ continue;
+
+ res.put(e.getKey(), cntr);
+ }
+ }
+ else
+ res = new HashMap<>(cntrMap);
- for (int i = 0; i < locParts.length; i++) {
- GridDhtLocalPartition part = locParts[i];
+ for (int i = 0; i < locParts.length(); i++) {
+ GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index cb83a73,d6e0743..0805be1
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@@ -86,14 -88,25 +86,27 @@@ public class HadoopClientProtocolMultip
}
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
- clearConnectionMap();
+ clearClients();
+
+ super.afterTest();
}
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void clearConnectionMap() throws IgniteCheckedException {
+ ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap =
+ GridTestUtils.getFieldValue(IgniteHadoopClientProtocolProvider.class, "cliMap");
+
+ for(IgniteInternalFuture<GridClient> fut : cliMap.values())
+ fut.get().close();
+
+ cliMap.clear();
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 0314b3d,7e4d5b6..ac1a6a6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -56,9 -52,7 +56,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+ import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@@ -933,9 -757,12 +934,12 @@@ public class GridMapQueryExecutor
private int page;
/** */
- private final int rowCount;
+ private final int rowCnt;
/** */
+ private boolean cpNeeded;
+
+ /** */
private volatile boolean closed;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index a92bf2b,072a081..80c4a08
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@@ -96,9 -95,16 +96,16 @@@ public class IgniteCacheDistributedQuer
g.cache(null).removeAll();
}
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
/** */
public void testRemoteQueryExecutionTimeout() throws Exception {
- testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, true);
+ testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
}
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 50f2ef0,56658df..e72c9cb
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@@ -30,12 -28,9 +30,14 @@@ import javax.cache.CacheException
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+ import org.apache.ignite.cache.affinity.Affinity;
+ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/examples/README.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc-test/src/queries_test.cpp
----------------------------------------------------------------------
diff --cc modules/platforms/cpp/odbc-test/src/queries_test.cpp
index 21edf4e,a7fc7a9..db9dafb
--- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp
@@@ -314,14 -294,9 +316,14 @@@ BOOST_AUTO_TEST_CASE(TestConnectionProt
Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;PROTOCOL_VERSION=1.6.0");
}
+BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_1_8_0)
+{
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;PROTOCOL_VERSION=1.8.0");
+}
+
BOOST_AUTO_TEST_CASE(TestTwoRowsInt8)
{
- CheckTwoRowsInt<int8_t>(SQL_C_STINYINT);
+ CheckTwoRowsInt<signed char>(SQL_C_STINYINT);
}
BOOST_AUTO_TEST_CASE(TestTwoRowsUint8)
@@@ -693,131 -674,115 +701,242 @@@ BOOST_AUTO_TEST_CASE(TestDataAtExecutio
BOOST_CHECK(ret == SQL_NO_DATA);
}
+ BOOST_AUTO_TEST_CASE(TestNullFields)
+ {
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache");
+
+ SQLRETURN ret;
+
+ TestType in(1, 2, 3, 4, "5", 6.0f, 7.0, true, Guid(8, 9), BinaryUtils::MakeDateGmt(1987, 6, 5),
+ BinaryUtils::MakeTimestampGmt(1998, 12, 27, 1, 2, 3, 456));
+
+ TestType inNull;
+
+ inNull.allNulls = true;
+
+ testCache.Put(1, in);
+ testCache.Put(2, inNull);
+ testCache.Put(3, in);
+
+ const size_t columnsCnt = 10;
+
+ SQLLEN columnLens[columnsCnt] = { 0 };
+
+ int8_t i8Column;
+ int16_t i16Column;
+ int32_t i32Column;
+ int64_t i64Column;
+ char strColumn[ODBC_BUFFER_SIZE];
+ float floatColumn;
+ double doubleColumn;
+ bool boolColumn;
+ SQL_DATE_STRUCT dateColumn;
+ SQL_TIMESTAMP_STRUCT timestampColumn;
+
+ // Binding columns.
+ ret = SQLBindCol(stmt, 1, SQL_C_STINYINT, &i8Column, 0, &columnLens[0]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 2, SQL_C_SSHORT, &i16Column, 0, &columnLens[1]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 3, SQL_C_SLONG, &i32Column, 0, &columnLens[2]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 4, SQL_C_SBIGINT, &i64Column, 0, &columnLens[3]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 5, SQL_C_CHAR, &strColumn, ODBC_BUFFER_SIZE, &columnLens[4]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 6, SQL_C_FLOAT, &floatColumn, 0, &columnLens[5]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 7, SQL_C_DOUBLE, &doubleColumn, 0, &columnLens[6]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 8, SQL_C_BIT, &boolColumn, 0, &columnLens[7]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 9, SQL_C_DATE, &dateColumn, 0, &columnLens[8]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ ret = SQLBindCol(stmt, 10, SQL_C_TIMESTAMP, ×tampColumn, 0, &columnLens[9]);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ SQLCHAR request[] = "SELECT i8Field, i16Field, i32Field, i64Field, strField, "
+ "floatField, doubleField, boolField, dateField, timestampField FROM TestType ORDER BY _key";
+
+ ret = SQLExecDirect(stmt, request, SQL_NTS);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ // Fetching the first non-null row.
+ ret = SQLFetch(stmt);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ // Checking that columns are not null.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ BOOST_CHECK_NE(columnLens[i], SQL_NULL_DATA);
+
+ // Fetching null row.
+ ret = SQLFetch(stmt);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ // Checking that columns are null.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ BOOST_CHECK_EQUAL(columnLens[i], SQL_NULL_DATA);
+
+ // Fetching the last non-null row.
+ ret = SQLFetch(stmt);
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ // Checking that columns are not null.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ BOOST_CHECK_NE(columnLens[i], SQL_NULL_DATA);
+
+ ret = SQLFetch(stmt);
+ BOOST_CHECK(ret == SQL_NO_DATA);
+ }
+
++
+BOOST_AUTO_TEST_CASE(TestDistributedJoins)
+{
+ // Starting additional node.
+ Ignite node1 = StartAdditionalNode("Node1");
+ Ignite node2 = StartAdditionalNode("Node2");
+
+ const int entriesNum = 1000;
+
+ // Filling cache with data.
+ for (int i = 0; i < entriesNum; ++i)
+ {
+ TestType entry;
+
+ entry.i32Field = i;
+ entry.i64Field = entriesNum - i - 1;
+
+ testCache.Put(i, entry);
+ }
+
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache");
+
+ SQLRETURN ret;
+
+ const size_t columnsCnt = 2;
+
+ SQLBIGINT columns[columnsCnt] = { 0 };
+
+ // Binding colums.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ {
+ ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ }
+
+ SQLCHAR request[] =
+ "SELECT T0.i32Field, T1.i64Field FROM TestType AS T0 "
+ "INNER JOIN TestType AS T1 "
+ "ON (T0.i32Field = T1.i64Field)";
+
+ ret = SQLExecDirect(stmt, request, SQL_NTS);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ int rowsNum = CountRows(stmt);
+
+ BOOST_CHECK_GT(rowsNum, 0);
+ BOOST_CHECK_LT(rowsNum, entriesNum);
+
+ Disconnect();
+
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;DISTRIBUTED_JOINS=true;");
+
+ // Binding colums.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ {
+ ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ }
+
+ ret = SQLExecDirect(stmt, request, SQL_NTS);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ rowsNum = CountRows(stmt);
+
+ BOOST_CHECK_EQUAL(rowsNum, entriesNum);
+}
+
+BOOST_AUTO_TEST_CASE(TestDistributedJoinsWithOldVersion)
+{
+ // Starting additional node.
+ Ignite node1 = StartAdditionalNode("Node1");
+ Ignite node2 = StartAdditionalNode("Node2");
+
+ const int entriesNum = 1000;
+
+ // Filling cache with data.
+ for (int i = 0; i < entriesNum; ++i)
+ {
+ TestType entry;
+
+ entry.i32Field = i;
+ entry.i64Field = entriesNum - i - 1;
+
+ testCache.Put(i, entry);
+ }
+
+ Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;CACHE=cache;DISTRIBUTED_JOINS=true;PROTOCOL_VERSION=1.6.0");
+
+ SQLRETURN ret;
+
+ const size_t columnsCnt = 2;
+
+ SQLBIGINT columns[columnsCnt] = { 0 };
+
+ // Binding colums.
+ for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
+ {
+ ret = SQLBindCol(stmt, i + 1, SQL_C_SLONG, &columns[i], 0, 0);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+ }
+
+ SQLCHAR request[] =
+ "SELECT T0.i32Field, T1.i64Field FROM TestType AS T0 "
+ "INNER JOIN TestType AS T1 "
+ "ON (T0.i32Field = T1.i64Field)";
+
+ ret = SQLExecDirect(stmt, request, SQL_NTS);
+
+ if (!SQL_SUCCEEDED(ret))
+ BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt));
+
+ int rowsNum = CountRows(stmt);
+
+ BOOST_CHECK_GT(rowsNum, 0);
+ BOOST_CHECK_LT(rowsNum, entriesNum);
+}
+
-
BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/cpp/odbc/src/odbc.cpp
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index bafc759,f7906ff..f4a07f6
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@@ -483,82 -483,7 +483,82 @@@ namespace Apache.Ignite.Core.Imp
}
}
}
-
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <typeparam name="TR">The type of the r.</typeparam>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inAction">In action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
+ protected TR DoOutInOpX<TR>(int type, Action<BinaryWriter> outAction, Func<IBinaryStream, long, TR> inAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ Debug.Assert(inErrorAction != null);
+
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ outAction(writer);
+
+ FinishMarshal(writer);
+
+ var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+
+ if (res != Error && inAction == null)
+ return default(TR); // quick path for void operations
+
+ stream.SynchronizeInput();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ if (res != Error)
+ return inAction != null ? inAction(stream, res) : default(TR);
+
+ throw inErrorAction(stream);
+ }
+ }
+
+ /// <summary>
+ /// Perform out-in operation with a single stream.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="outAction">Out action.</param>
+ /// <param name="inErrorAction">The action to read an error.</param>
+ /// <returns>
+ /// Result.
+ /// </returns>
- protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
++ protected bool DoOutInOpX(int type, Action<BinaryWriter> outAction,
+ Func<IBinaryStream, Exception> inErrorAction)
+ {
+ Debug.Assert(inErrorAction != null);
+
+ using (var stream = IgniteManager.Memory.Allocate().GetStream())
+ {
+ var writer = _marsh.StartMarshal(stream);
+
+ outAction(writer);
+
+ FinishMarshal(writer);
+
+ var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput());
+
+ if (res != Error)
+ return res == True;
+
+ stream.SynchronizeInput();
+
+ stream.Seek(0, SeekOrigin.Begin);
+
+ throw inErrorAction(stream);
+ }
+ }
+
/// <summary>
/// Perform out-in operation.
/// </summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2dc1d71/pom.xml
----------------------------------------------------------------------
[08/10] ignite git commit: IGNITE-4227: ODBC: Implemented SQLError.
This closes #1237.
Posted by vo...@apache.org.
IGNITE-4227: ODBC: Implemented SQLError. This closes #1237.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc695f8e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc695f8e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc695f8e
Branch: refs/heads/ignite-4259
Commit: bc695f8e3306c6d74d4fe53d9a98adedd43ad8f0
Parents: f2dc1d7
Author: Igor Sapego <is...@gridgain.com>
Authored: Tue Nov 22 12:05:15 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 12:05:15 2016 +0300
----------------------------------------------------------------------
.../cpp/odbc-test/src/api_robustness_test.cpp | 45 +++++++++++++++
.../platforms/cpp/odbc/include/ignite/odbc.h | 12 +++-
.../ignite/odbc/diagnostic/diagnosable.h | 7 +++
.../odbc/diagnostic/diagnosable_adapter.h | 10 ++++
.../ignite/odbc/diagnostic/diagnostic_record.h | 19 +++++++
.../odbc/diagnostic/diagnostic_record_storage.h | 16 ++++++
.../odbc/os/win/src/system/socket_client.cpp | 4 +-
.../odbc/src/diagnostic/diagnostic_record.cpp | 16 +++++-
.../diagnostic/diagnostic_record_storage.cpp | 18 ++++++
modules/platforms/cpp/odbc/src/entry_points.cpp | 26 ++++-----
modules/platforms/cpp/odbc/src/odbc.cpp | 59 ++++++++++++++++++++
11 files changed, 214 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp b/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
index fbd5f12..13a5ea6 100644
--- a/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/api_robustness_test.cpp
@@ -1066,4 +1066,49 @@ BOOST_AUTO_TEST_CASE(TestFetchScrollFirst)
CheckFetchScrollUnsupportedOrientation(SQL_FETCH_FIRST);
}
+BOOST_AUTO_TEST_CASE(TestSQLError)
+{
+ // There are no checks because we do not really care what is the result of these
+ // calls as long as they do not cause segmentation fault.
+
+ Connect("DRIVER={Apache Ignite};address=127.0.0.1:11110;cache=cache");
+
+ SQLCHAR state[6] = { 0 };
+ SQLINTEGER nativeCode = 0;
+ SQLCHAR message[ODBC_BUFFER_SIZE] = { 0 };
+ SQLSMALLINT messageLen = 0;
+
+ // Everything is ok.
+ SQLRETURN ret = SQLError(env, 0, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+ if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+ BOOST_FAIL("Unexpected error");
+
+ ret = SQLError(0, dbc, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+ if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+ BOOST_FAIL("Unexpected error");
+
+ ret = SQLError(0, 0, stmt, state, &nativeCode, message, sizeof(message), &messageLen);
+
+ if (ret != SQL_SUCCESS && ret != SQL_NO_DATA)
+ BOOST_FAIL("Unexpected error");
+
+ SQLError(0, 0, 0, state, &nativeCode, message, sizeof(message), &messageLen);
+
+ SQLError(0, 0, stmt, 0, &nativeCode, message, sizeof(message), &messageLen);
+
+ SQLError(0, 0, stmt, state, 0, message, sizeof(message), &messageLen);
+
+ SQLError(0, 0, stmt, state, &nativeCode, 0, sizeof(message), &messageLen);
+
+ SQLError(0, 0, stmt, state, &nativeCode, message, 0, &messageLen);
+
+ SQLError(0, 0, stmt, state, &nativeCode, message, sizeof(message), 0);
+
+ SQLError(0, 0, stmt, 0, 0, 0, 0, 0);
+
+ SQLError(0, 0, 0, 0, 0, 0, 0, 0);
+}
+
BOOST_AUTO_TEST_SUITE_END()
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc.h b/modules/platforms/cpp/odbc/include/ignite/odbc.h
index ec0861c..345cdb8 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc.h
@@ -255,6 +255,16 @@ namespace ignite
SQLRETURN SQLParamData(SQLHSTMT stmt, SQLPOINTER* value);
SQLRETURN SQLPutData(SQLHSTMT stmt, SQLPOINTER data, SQLLEN strLengthOrIndicator);
+
+ SQLRETURN SQLError(SQLHENV env,
+ SQLHDBC conn,
+ SQLHSTMT stmt,
+ SQLCHAR* state,
+ SQLINTEGER* error,
+ SQLCHAR* msgBuf,
+ SQLSMALLINT msgBufLen,
+ SQLSMALLINT* msgResLen);
+
} // namespace ignite
-#endif //_IGNITE_ODBC
+#endif //_IGNITE_ODBC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
index 909fe01..6937fcc 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable.h
@@ -48,6 +48,13 @@ namespace ignite
virtual const diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords() const = 0;
/**
+ * Get diagnostic record.
+ *
+ * @return Diagnostic record.
+ */
+ virtual diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords() = 0;
+
+ /**
* Add new status record.
*
* @param sqlState SQL state.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
index 63d26ca..548eecd 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnosable_adapter.h
@@ -62,6 +62,16 @@ namespace ignite
}
/**
+ * Get diagnostic record.
+ *
+ * @return Diagnostic record.
+ */
+ virtual diagnostic::DiagnosticRecordStorage& GetDiagnosticRecords()
+ {
+ return diagnosticRecords;
+ }
+
+ /**
* Add new status record.
*
* @param sqlState SQL state.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
index 670e0aa..80d5090 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record.h
@@ -127,6 +127,19 @@ namespace ignite
*/
int32_t GetColumnNumber() const;
+ /**
+ * Check if the record was retrieved with the SQLError previously.
+ *
+ * return True if the record was retrieved with the SQLError
+ * previously.
+ */
+ bool IsRetrieved() const;
+
+ /**
+ * Mark record as retrieved with the SQLError.
+ */
+ void MarkRetrieved();
+
private:
/** SQL state diagnostic code. */
SqlState sqlState;
@@ -157,6 +170,12 @@ namespace ignite
* result set or the parameter number in the set of parameters.
*/
int32_t columnNum;
+
+ /**
+ * Flag that shows if the record was retrieved with the
+ * SQLError previously.
+ */
+ bool retrieved;
};
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
index 4cc3576..b45bb7d 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/diagnostic/diagnostic_record_storage.h
@@ -139,6 +139,22 @@ namespace ignite
const DiagnosticRecord& GetStatusRecord(int32_t idx) const;
/**
+ * Get specified status record.
+ *
+ * @param idx Status record index.
+ * @return Status record instance reference.
+ */
+ DiagnosticRecord& GetStatusRecord(int32_t idx);
+
+ /**
+ * Get last non-retrieved status record index.
+ *
+ * @return Index of the last non-retrieved status record or zero
+ * if nothing was found.
+ */
+ int32_t GetLastNonRetrieved() const;
+
+ /**
* Check if the record is in the success state.
*
* @return True if the record is in the success state.
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
index bc4cdc0..e248323 100644
--- a/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
+++ b/modules/platforms/cpp/odbc/os/win/src/system/socket_client.cpp
@@ -83,8 +83,8 @@ namespace ignite
// Attempt to connect to an address until one succeeds
for (addrinfo *it = result; it != NULL; it = it->ai_next)
{
- LOG_MSG("Addr: %u.%u.%u.%u\n", it->ai_addr->sa_data[2], it->ai_addr->sa_data[3],
- it->ai_addr->sa_data[4], it->ai_addr->sa_data[5]);
+ LOG_MSG("Addr: %u.%u.%u.%u\n", it->ai_addr->sa_data[2] & 0xFF, it->ai_addr->sa_data[3] & 0xFF,
+ it->ai_addr->sa_data[4] & 0xFF, it->ai_addr->sa_data[5] & 0xFF);
// Create a SOCKET for connecting to server
socketHandle = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
index 1b654d2..215d77f 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record.cpp
@@ -89,7 +89,8 @@ namespace ignite
connectionName(),
serverName(),
rowNum(0),
- columnNum(0)
+ columnNum(0),
+ retrieved(false)
{
// No-op.
}
@@ -102,7 +103,8 @@ namespace ignite
connectionName(connectionName),
serverName(serverName),
rowNum(rowNum),
- columnNum(columnNum)
+ columnNum(columnNum),
+ retrieved(false)
{
// No-op.
}
@@ -260,6 +262,16 @@ namespace ignite
{
return columnNum;
}
+
+ bool DiagnosticRecord::IsRetrieved() const
+ {
+ return retrieved;
+ }
+
+ void DiagnosticRecord::MarkRetrieved()
+ {
+ retrieved = true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
index 99ef292..c075567 100644
--- a/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
+++ b/modules/platforms/cpp/odbc/src/diagnostic/diagnostic_record_storage.cpp
@@ -102,6 +102,24 @@ namespace ignite
return statusRecords[idx - 1];
}
+ DiagnosticRecord& DiagnosticRecordStorage::GetStatusRecord(int32_t idx)
+ {
+ return statusRecords[idx - 1];
+ }
+
+ int32_t DiagnosticRecordStorage::GetLastNonRetrieved() const
+ {
+ for (size_t i = 0; i < statusRecords.size(); ++i)
+ {
+ const DiagnosticRecord& record = statusRecords[i];
+
+ if (!record.IsRetrieved())
+ return static_cast<int32_t>(i + 1);
+ }
+
+ return 0;
+ }
+
bool DiagnosticRecordStorage::IsSuccessful() const
{
return result == SQL_RESULT_SUCCESS ||
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/entry_points.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/entry_points.cpp b/modules/platforms/cpp/odbc/src/entry_points.cpp
index 08016cc..a85b3cf 100644
--- a/modules/platforms/cpp/odbc/src/entry_points.cpp
+++ b/modules/platforms/cpp/odbc/src/entry_points.cpp
@@ -412,6 +412,19 @@ SQLRETURN SQL_API SQLPutData(SQLHSTMT stmt,
return ignite::SQLPutData(stmt, data, strLengthOrIndicator);
}
+SQLRETURN SQL_API SQLError(SQLHENV env,
+ SQLHDBC conn,
+ SQLHSTMT stmt,
+ SQLCHAR* state,
+ SQLINTEGER* error,
+ SQLCHAR* msgBuf,
+ SQLSMALLINT msgBufLen,
+ SQLSMALLINT* msgResLen)
+{
+ return ignite::SQLError(env, conn, stmt, state,
+ error, msgBuf, msgBufLen, msgResLen);
+}
+
//
// ==== Not implemented ====
//
@@ -434,19 +447,6 @@ SQLRETURN SQL_API SQLColAttributes(SQLHSTMT stmt,
return SQL_SUCCESS;
}
-SQLRETURN SQL_API SQLError(SQLHENV env,
- SQLHDBC conn,
- SQLHSTMT stmt,
- SQLCHAR* state,
- SQLINTEGER* error,
- SQLCHAR* msgBuf,
- SQLSMALLINT msgBufLen,
- SQLSMALLINT* msgResLen)
-{
- LOG_MSG("SQLError called\n");
- return SQL_ERROR;
-}
-
SQLRETURN SQL_API SQLGetCursorName(SQLHSTMT stmt,
SQLCHAR* nameBuf,
SQLSMALLINT nameBufLen,
http://git-wip-us.apache.org/repos/asf/ignite/blob/bc695f8e/modules/platforms/cpp/odbc/src/odbc.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/odbc.cpp b/modules/platforms/cpp/odbc/src/odbc.cpp
index 79036eb..684ed08 100644
--- a/modules/platforms/cpp/odbc/src/odbc.cpp
+++ b/modules/platforms/cpp/odbc/src/odbc.cpp
@@ -1255,4 +1255,63 @@ namespace ignite
return statement->GetDiagnosticRecords().GetReturnCode();
}
+ SQLRETURN SQLError(SQLHENV env,
+ SQLHDBC conn,
+ SQLHSTMT stmt,
+ SQLCHAR* state,
+ SQLINTEGER* error,
+ SQLCHAR* msgBuf,
+ SQLSMALLINT msgBufLen,
+ SQLSMALLINT* msgResLen)
+ {
+ using namespace ignite::utility;
+ using namespace ignite::odbc;
+ using namespace ignite::odbc::diagnostic;
+ using namespace ignite::odbc::type_traits;
+
+ using ignite::odbc::app::ApplicationDataBuffer;
+
+ LOG_MSG("SQLError called\n");
+
+ SQLHANDLE handle = 0;
+
+ if (env != 0)
+ handle = static_cast<SQLHANDLE>(env);
+ else if (conn != 0)
+ handle = static_cast<SQLHANDLE>(conn);
+ else if (stmt != 0)
+ handle = static_cast<SQLHANDLE>(stmt);
+ else
+ return SQL_INVALID_HANDLE;
+
+ Diagnosable *diag = reinterpret_cast<Diagnosable*>(handle);
+
+ DiagnosticRecordStorage& records = diag->GetDiagnosticRecords();
+
+ int32_t recNum = records.GetLastNonRetrieved();
+
+ if (recNum < 1 || recNum > records.GetStatusRecordsNumber())
+ return SQL_NO_DATA;
+
+ DiagnosticRecord& record = records.GetStatusRecord(recNum);
+
+ record.MarkRetrieved();
+
+ if (state)
+ CopyStringToBuffer(record.GetSqlState(), reinterpret_cast<char*>(state), 6);
+
+ if (error)
+ *error = 0;
+
+ SqlLen outResLen;
+ ApplicationDataBuffer outBuffer(IGNITE_ODBC_C_TYPE_CHAR, msgBuf, msgBufLen, &outResLen);
+
+ outBuffer.PutString(record.GetMessageText());
+
+ if (msgResLen)
+ *msgResLen = static_cast<SQLSMALLINT>(outResLen);
+
+ return SQL_SUCCESS;
+ }
+
} // namespace ignite;
[02/10] ignite git commit: IGNITE-2355: fix test - clear client
connections before and after a test.
Posted by vo...@apache.org.
IGNITE-2355: fix test - clear client connections before and after a test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e4a279e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e4a279e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e4a279e
Branch: refs/heads/ignite-4259
Commit: 6e4a279e34584881469a7d841432e6c38db2f06f
Parents: 88f38ac
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Nov 21 19:15:17 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Mon Nov 21 19:15:17 2016 +0500
----------------------------------------------------------------------
...opClientProtocolMultipleServersSelfTest.java | 24 ++++++++++++++++++++
1 file changed, 24 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e4a279e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
index 0e51938..cb83a73 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java
@@ -23,6 +23,7 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,6 +45,8 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
@@ -76,9 +79,18 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
}
/** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ clearClients();
+ }
+
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
+ clearClients();
+
super.afterTest();
}
@@ -92,6 +104,18 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS
}
/**
+ *
+ */
+ private void clearClients() {
+ ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = GridTestUtils.getFieldValue(
+ IgniteHadoopClientProtocolProvider.class,
+ IgniteHadoopClientProtocolProvider.class,
+ "cliMap");
+
+ cliMap.clear();
+ }
+
+ /**
* @throws Exception If failed.
*/
private void beforeJob() throws Exception {
[07/10] ignite git commit: IGNITE-3748 Data rebalancing of large
cache can hang out.
Posted by vo...@apache.org.
IGNITE-3748 Data rebalancing of large cache can hang out.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f80bfbd1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f80bfbd1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f80bfbd1
Branch: refs/heads/ignite-4259
Commit: f80bfbd19e7870554bf3abd13bde89b0f39aaee1
Parents: d15eba4
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Nov 22 12:02:57 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Nov 22 12:02:57 2016 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPreloader.java | 18 ++++++++++++-
.../ignite/internal/util/GridLogThrottle.java | 28 +++++++++++++++-----
2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f80bfbd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 6ed58e7..0865d9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
@@ -788,7 +789,22 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
GridDhtLocalPartition part = partsToEvict.poll();
if (part != null)
- part.tryEvict();
+ try {
+ part.tryEvict();
+ }
+ catch (Throwable ex) {
+ if (cctx.kernalContext().isStopping()) {
+ LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
+ false,
+ true);
+
+ partsToEvict.clear();
+
+ return true;
+ }
+ else
+ LT.error(log, ex, "Partition eviction failed, this can cause grid hang.");
+ }
}
finally {
if (!partsToEvict.isEmptyx())
http://git-wip-us.apache.org/repos/asf/ignite/blob/f80bfbd1/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
index ce6783a..c8ba865 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
@@ -81,12 +81,12 @@ public class GridLogThrottle {
* @param log Logger.
* @param e Error (optional).
* @param msg Message.
- * @param byMessage Errors group by message, not by tuple(error, msg).
+ * @param byMsg Errors group by message, not by tuple(error, msg).
*/
- public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMessage) {
+ public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMsg) {
assert !F.isEmpty(msg);
- log(log, e, msg, null, LogLevel.ERROR, false, byMessage);
+ log(log, e, msg, null, LogLevel.ERROR, false, byMsg);
}
/**
@@ -109,6 +109,22 @@ public class GridLogThrottle {
* @param e Error (optional).
* @param msg Message.
* @param quite Print warning anyway.
+ * @param byMsg Errors group by message, not by tuple(error, msg).
+ */
+ public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite, boolean byMsg) {
+ assert !F.isEmpty(msg);
+
+ log(log, e, msg, null, LogLevel.WARN, quite, byMsg);
+ }
+
+
+ /**
+ * Logs warning if needed.
+ *
+ * @param log Logger.
+ * @param e Error (optional).
+ * @param msg Message.
+ * @param quite Print warning anyway.
*/
public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite) {
assert !F.isEmpty(msg);
@@ -168,15 +184,15 @@ public class GridLogThrottle {
* @param longMsg Long message (or just message).
* @param shortMsg Short message for quite logging.
* @param level Level where messages should appear.
- * @param byMessage Errors group by message, not by tuple(error, msg).
+ * @param byMsg Errors group by message, not by tuple(error, msg).
*/
@SuppressWarnings({"RedundantTypeArguments"})
private static void log(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg,
- LogLevel level, boolean quiet, boolean byMessage) {
+ LogLevel level, boolean quiet, boolean byMsg) {
assert !F.isEmpty(longMsg);
IgniteBiTuple<Class<? extends Throwable>, String> tup =
- e != null && !byMessage ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
+ e != null && !byMsg ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
F.<Class<? extends Throwable>, String>t(null, longMsg);
while (true) {