You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/11/20 09:15:43 UTC
[10/29] ignite git commit: Optimization for single key cache 'get'
operation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
new file mode 100644
index 0000000..ba0081c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridNearSingleGetResponse extends GridCacheMessage implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final int INVALID_PART_FLAG_MASK = 0x1;
+
+ /** */
+ public static final int CONTAINS_VAL_FLAG_MASK = 0x2;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** */
+ private Message res;
+
+ /** */
+ private AffinityTopologyVersion topVer;
+
+ /** Error. */
+ @GridDirectTransient
+ private IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** */
+ private byte flags;
+
+ /**
+ * Empty constructor required for {@link Message}.
+ */
+ public GridNearSingleGetResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param futId Future ID.
+ * @param topVer Topology version.
+ * @param res Result.
+ * @param invalidPartitions {@code True} if invalid partitions error occurred.
+ * @param addDepInfo Deployment info.
+ */
+ public GridNearSingleGetResponse(
+ int cacheId,
+ IgniteUuid futId,
+ AffinityTopologyVersion topVer,
+ @Nullable Message res,
+ boolean invalidPartitions,
+ boolean addDepInfo
+ ) {
+ assert futId != null;
+
+ this.cacheId = cacheId;
+ this.futId = futId;
+ this.topVer = topVer;
+ this.res = res;
+ this.addDepInfo = addDepInfo;
+
+ if (invalidPartitions)
+ flags = (byte)(flags | INVALID_PART_FLAG_MASK);
+ }
+
+ /**
+ * @param err Error.
+ */
+ public void error(IgniteCheckedException err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Topology version.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return {@code True} if invalid partitions error occurred.
+ */
+ public boolean invalidPartitions() {
+ return (flags & INVALID_PART_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @return Results for request with set flag {@link GridNearSingleGetRequest#skipValues()}.
+ */
+ public boolean containsValue() {
+ return (flags & CONTAINS_VAL_FLAG_MASK) != 0;
+ }
+
+ /**
+ *
+ */
+ public void setContainsValue() {
+ flags = (byte)(flags | CONTAINS_VAL_FLAG_MASK);
+ }
+
+ /**
+ * @return Result.
+ */
+ public Message result() {
+ return res;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (res != null) {
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ if (res instanceof CacheObject)
+ prepareMarshalCacheObject((CacheObject) res, cctx);
+ else if (res instanceof CacheVersionedValue)
+ ((CacheVersionedValue)res).prepareMarshal(cctx.cacheObjectContext());
+ else if (res instanceof GridCacheEntryInfo)
+ ((GridCacheEntryInfo)res).marshal(cctx);
+ }
+
+ if (err != null)
+ errBytes = ctx.marshaller().marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (res != null) {
+ GridCacheContext cctx = ctx.cacheContext(cacheId());
+
+ if (res instanceof CacheObject)
+ ((CacheObject)res).finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ else if (res instanceof CacheVersionedValue)
+ ((CacheVersionedValue)res).finishUnmarshal(cctx, ldr);
+ else if (res instanceof GridCacheEntryInfo)
+ ((GridCacheEntryInfo)res).unmarshal(cctx, ldr);
+ }
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeIgniteUuid("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("res", res))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readIgniteUuid("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ res = reader.readMessage("res");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearSingleGetResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 117;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 8;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearSingleGetResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 1a26028..fd3d056 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -423,7 +423,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
assert nodeId != null;
assert res != null;
- GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().<Boolean>future(res.version(),
+ GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().<Boolean>mvccFuture(res.version(),
res.futureId());
if (fut != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 9c022b2..102cc4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -79,7 +79,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private GridCacheSharedContext<K, V> cctx;
/** Future ID. */
- private IgniteUuid futId;
+ private final IgniteUuid futId;
/** Transaction. */
@GridToStringInclude
@@ -125,26 +125,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /**
- * @return Involved nodes.
- */
- @Override public Collection<? extends ClusterNode> nodes() {
- return
- F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
- @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
- if (isMini(f))
- return ((MiniFuture)f).node();
-
- return cctx.discovery().localNode();
- }
- });
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
@@ -298,7 +278,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
// Don't forget to clean up.
- cctx.mvcc().removeFuture(this);
+ cctx.mvcc().removeFuture(futId);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b92be31..1c01e4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -332,7 +332,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
- boolean skipVals,
+ final boolean skipVals,
final boolean needVer,
final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
@@ -361,35 +361,70 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
});
}
else if (cacheCtx.isColocated()) {
- return cacheCtx.colocated().loadAsync(
- keys,
- readThrough,
- /*force primary*/needVer,
- topologyVersion(),
- CU.subjectId(this, cctx),
- resolveTaskName(),
- /*deserializePortable*/false,
- accessPolicy(cacheCtx, keys),
- skipVals,
- /*can remap*/true,
- needVer,
- /*keepCacheObject*/true
- ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
- @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
- try {
- Map<Object, Object> map = f.get();
-
- processLoaded(map, keys, needVer, c);
-
- return null;
+ if (keys.size() == 1) {
+ final KeyCacheObject key = F.first(keys);
+
+ return cacheCtx.colocated().loadAsync(
+ key,
+ readThrough,
+ /*force primary*/needVer,
+ topologyVersion(),
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ /*deserializePortable*/false,
+ accessPolicy(cacheCtx, keys),
+ skipVals,
+ /*can remap*/true,
+ needVer,
+ /*keepCacheObject*/true
+ ).chain(new C1<IgniteInternalFuture<Object>, Void>() {
+ @Override public Void apply(IgniteInternalFuture<Object> f) {
+ try {
+ Object val = f.get();
+
+ processLoaded(key, val, needVer, skipVals, c);
+
+ return null;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
}
- catch (Exception e) {
- setRollbackOnly();
-
- throw new GridClosureException(e);
+ });
+ }
+ else {
+ return cacheCtx.colocated().loadAsync(
+ keys,
+ readThrough,
+ /*force primary*/needVer,
+ topologyVersion(),
+ CU.subjectId(this, cctx),
+ resolveTaskName(),
+ /*deserializePortable*/false,
+ accessPolicy(cacheCtx, keys),
+ skipVals,
+ /*can remap*/true,
+ needVer,
+ /*keepCacheObject*/true
+ ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
+ @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
+ try {
+ Map<Object, Object> map = f.get();
+
+ processLoaded(map, keys, needVer, c);
+
+ return null;
+ }
+ catch (Exception e) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
}
- }
- });
+ });
+ }
}
else {
assert cacheCtx.isLocal();
@@ -409,29 +444,45 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
final Collection<KeyCacheObject> keys,
boolean needVer,
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
- for (KeyCacheObject key : keys) {
- Object val = map.get(key);
+ for (KeyCacheObject key : keys)
+ processLoaded(key, map.get(key), needVer, false, c);
+ }
- if (val != null) {
- Object v;
- GridCacheVersion ver;
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param needVer If {@code true} version is required for loaded values.
+ * @param skipVals Skip values flag.
+ * @param c Closure.
+ */
+ private void processLoaded(
+ KeyCacheObject key,
+ @Nullable Object val,
+ boolean needVer,
+ boolean skipVals,
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+ if (val != null) {
+ Object v;
+ GridCacheVersion ver;
- if (needVer) {
- T2<Object, GridCacheVersion> t = (T2)val;
+ if (needVer) {
+ T2<Object, GridCacheVersion> t = (T2)val;
- v = t.get1();
- ver = t.get2();
- }
- else {
- v = val;
- ver = null;
- }
-
- c.apply(key, v, ver);
+ v = t.get1();
+ ver = t.get2();
}
- else
+ else {
+ v = val;
+ ver = null;
+ }
+
+ if (skipVals && v == Boolean.FALSE)
c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
+ else
+ c.apply(key, v, ver);
}
+ else
+ c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
}
/** {@inheritDoc} */
@@ -771,7 +822,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (fut == null && !commitFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
return commitFut.get();
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
final IgniteInternalFuture<?> prepareFut = prepFut.get();
@@ -816,7 +867,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (!rollbackFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, false)))
return rollbackFut.get();
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
IgniteInternalFuture<?> prepFut = this.prepFut.get();
@@ -957,7 +1008,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
if (prep == null || prep.isDone()) {
assert prep != null || optimistic();
@@ -1016,7 +1067,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/false);
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
IgniteInternalFuture<?> prep = prepFut.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index cfaadc9..52cad91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -25,7 +25,7 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
@@ -48,8 +48,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
/**
* Common code for tx prepare in optimistic and pessimistic modes.
*/
-public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx>
- implements GridCacheFuture<IgniteInternalTx> {
+public abstract class GridNearTxPrepareFutureAdapter extends
+ GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 87da9a1..544d5b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -231,8 +231,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
/**
* @param fut Clears future from cache.
*/
- void onFutureDone(GridCacheFuture<?> fut) {
- if (ctx.mvcc().removeFuture(fut)) {
+ void onFutureDone(GridLocalLockFuture fut) {
+ if (ctx.mvcc().removeMvccFuture(fut)) {
if (log().isDebugEnabled())
log().debug("Explicitly removed future from map of futures: " + fut);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index cb14b4c..d392d53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -154,11 +154,6 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
}
/** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
return false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 0e5657b..a9846ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -477,7 +477,7 @@ public class IgniteTxHandler {
*/
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
- .<IgniteInternalTx>future(res.version(), res.futureId());
+ .<IgniteInternalTx>mvccFuture(res.version(), res.futureId());
if (fut == null) {
U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
@@ -495,8 +495,7 @@ public class IgniteTxHandler {
private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse res) {
ctx.tm().onFinishedRemote(nodeId, res.threadId());
- GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
- res.xid(), res.futureId());
+ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -513,7 +512,7 @@ public class IgniteTxHandler {
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
- GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId());
+ GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -534,8 +533,7 @@ public class IgniteTxHandler {
assert res != null;
if (res.checkCommitted()) {
- GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
- res.xid(), res.futureId());
+ GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -547,8 +545,7 @@ public class IgniteTxHandler {
fut.onResult(nodeId, res);
}
else {
- GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(
- res.xid(), res.futureId());
+ GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
@@ -566,7 +563,8 @@ public class IgniteTxHandler {
* @param req Request.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) {
+ @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId,
+ GridNearTxFinishRequest req) {
return finish(nodeId, null, req);
}
@@ -1359,8 +1357,7 @@ public class IgniteTxHandler {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
- GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().
- <Boolean>future(res.version(), res.futureId());
+ GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 9e44b10..ecb0595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1372,7 +1372,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (txEntry != null) {
CacheObject val = txEntry.value();
- // Read value from locked entry in group-lock transaction as well.
if (txEntry.hasValue()) {
if (!F.isEmpty(txEntry.entryProcessors()))
val = txEntry.applyEntryProcessors(val);
@@ -2224,6 +2223,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* @param cacheCtx Cache context.
* @param keys Keys to load.
+ * @param filter Filter.
* @param ret Return value.
* @param needReadVer Read version flag.
* @param singleRmv {@code True} for single remove operation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 67bca51..247ccaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1764,7 +1764,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
tx.originatingNodeId(),
tx.transactionNodes());
- cctx.mvcc().addFuture(fut);
+ cctx.mvcc().addFuture(fut, fut.futureId());
if (log.isDebugEnabled())
log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 6131f54..6cf10f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -765,6 +766,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
};
+ IgniteInClosure<IgniteCache<Object, Object>> getAllOp = new CI1<IgniteCache<Object, Object>>() {
+ @Override public void apply(IgniteCache<Object, Object> cache) {
+ cache.getAll(F.asSet(1, 2));
+ }
+ };
+
int cnt = 0;
for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
@@ -799,7 +806,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']');
- checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp);
+ checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp);
+
+ checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp);
client.destroyCache(ccfg.getName());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 1d79e20..e8e86e9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -382,6 +382,81 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
* @throws Exception If failed.
*/
+ public void testContainsKeyTx() throws Exception {
+ if (!txEnabled())
+ return;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(i);
+
+ try (Transaction tx = txs.txStart()) {
+ assertNull(key, cache.get(key));
+
+ assertFalse(cache.containsKey(key));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ assertNull(key, cache.get(key));
+
+ cache.put(key, i);
+
+ assertTrue(cache.containsKey(key));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testContainsKeysTx() throws Exception {
+ if (!txEnabled())
+ return;
+
+ IgniteCache<String, Integer> cache = jcache();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ Set<String> keys = new HashSet<>();
+
+ for (int i = 0; i < 10; i++) {
+ String key = String.valueOf(i);
+
+ keys.add(key);
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ for (String key : keys)
+ assertNull(key, cache.get(key));
+
+ assertFalse(cache.containsKeys(keys));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = txs.txStart()) {
+ for (String key : keys)
+ assertNull(key, cache.get(key));
+
+ for (String key : keys)
+ cache.put(key, 0);
+
+ assertTrue(cache.containsKeys(keys));
+
+ tx.commit();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testRemoveInExplicitLocks() throws Exception {
if (lockingEnabled()) {
IgniteCache<String, Integer> cache = jcache();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
index 1ef77f2..45c8c2c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
@@ -613,21 +613,6 @@ public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest {
X.println("Near entry [grid="+ g.name() + ", key=" + k + ", entry=" + nearEntry);
X.println("DHT entry [grid=" + g.name() + ", key=" + k + ", entry=" + dhtEntry);
-
- GridCacheMvccCandidate nearCand =
- nearEntry == null ? null : F.first(nearEntry.localCandidates());
-
- if (nearCand != null)
- X.println("Near futures: " +
- nearEntry.context().mvcc().futures(nearCand.version()));
-
- GridCacheMvccCandidate dhtCand =
- dhtEntry == null ? null : F.first(dhtEntry.localCandidates());
-
- if (dhtCand != null)
- X.println("Dht futures: " +
- dhtEntry.context().mvcc().futures(dhtCand.version()));
-
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
index cbfc97b..2cb4a00 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -228,7 +229,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest {
@Override public void onMessage(UUID nodeId, Object msg) {
info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']');
- if (msg instanceof GridNearGetRequest) {
+ if (msg instanceof GridNearSingleGetRequest) {
info("Setting flag: " + System.identityHashCode(received));
received.set(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
index 8ddee0c..e5cff5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -218,7 +221,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst
* @throws Exception If failed.
*/
public void testGet() throws Exception {
- bannedMsg.set(GridNearGetRequest.class);
+ bannedMsg.set(GridNearSingleGetRequest.class);
executeTest(new Callable<Integer>() {
/** {@inheritDoc} */
@@ -235,6 +238,28 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst
}
/**
+ * @throws Exception If failed.
+ */
+ public void testGetAll() throws Exception {
+ bannedMsg.set(GridNearGetRequest.class);
+
+ executeTest(new Callable<Integer>() {
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ info("Start operation.");
+
+ Set<Integer> keys = F.asSet(1, 2, 3);
+
+ clientCache().getAll(keys);
+
+ info("Stop operation.");
+
+ return null;
+ }
+ });
+ }
+
+ /**
*
* @param call Closure executing cache operation.
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index f4423f7..9c1abc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -30,6 +31,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
/**
@@ -37,7 +39,10 @@ import org.apache.ignite.internal.util.typedef.X;
*/
public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest {
/** Allows to change behavior of readExternal method. */
- protected static AtomicInteger readCnt = new AtomicInteger();
+ protected static final AtomicInteger readCnt = new AtomicInteger();
+
+ /** Allows to change behavior of readExternal method. */
+ protected static final AtomicInteger valReadCnt = new AtomicInteger();
/** Iterable key. */
protected static int key = 0;
@@ -86,71 +91,40 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
return cfg;
}
- /** Test key 1. */
- protected static class TestKey implements Externalizable {
- /** Field. */
- @QuerySqlField(index = true)
- private String field;
-
- /**
- * @param field Test key 1.
- */
- public TestKey(String field) {
- this.field = field;
- }
-
- /** Test key 1. */
- public TestKey() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestKey key = (TestKey)o;
-
- return !(field != null ? !field.equals(key.field) : key.field != null);
- }
+ /**
+ * Sends put atomically and handles fail.
+ *
+ * @param k Key.
+ */
+ protected void failAtomicPut(int k) {
+ try {
+ jcache(0).put(new TestKey(String.valueOf(k)), "");
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return field != null ? field.hashCode() : 0;
+ assert false : "p2p marshalling failed, but error response was not sent";
}
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(field);
+ catch (CacheException e) {
+ assert X.hasCause(e, IOException.class);
}
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- field = (String)in.readObject();
-
- if (readCnt.decrementAndGet() <= 0)
- throw new IOException("Class can not be unmarshalled.");
- }
+ assert readCnt.get() == 0; //ensure we have read count as expected.
}
/**
- * Sends put atomically and handles fail.
+ * Sends get atomically and handles fail.
*
* @param k Key.
*/
- protected void failAtomicPut(int k) {
+ protected void failGetAll(int k) {
try {
- jcache(0).put(new TestKey(String.valueOf(k)), "");
+ Set<Object> keys = F.<Object>asSet(new TestKey(String.valueOf(k)));
+
+ jcache(0).getAll(keys);
assert false : "p2p marshalling failed, but error response was not sent";
}
catch (CacheException e) {
assert X.hasCause(e, IOException.class);
}
-
- assert readCnt.get() == 0; //ensure we have read count as expected.
}
/**
@@ -158,7 +132,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
*
* @param k Key.
*/
- protected void failAtomicGet(int k) {
+ protected void failGet(int k) {
try {
jcache(0).get(new TestKey(String.valueOf(k)));
@@ -175,38 +149,132 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
* @throws Exception If failed.
*/
public void testResponseMessageOnUnmarshallingFailed() throws Exception {
- //GridNearAtomicUpdateRequest unmarshalling failed test
+ // GridNearAtomicUpdateRequest unmarshalling failed test.
readCnt.set(1);
failAtomicPut(++key);
- //Check that cache is empty.
+ // Check that cache is empty.
readCnt.set(Integer.MAX_VALUE);
assert jcache(0).get(new TestKey(String.valueOf(key))) == null;
- //GridDhtAtomicUpdateRequest unmarshalling failed test
+ // GridDhtAtomicUpdateRequest unmarshalling failed test.
readCnt.set(2);
failAtomicPut(++key);
- //Check that cache is not empty.
+ // Check that cache is not empty.
readCnt.set(Integer.MAX_VALUE);
assert jcache(0).get(new TestKey(String.valueOf(key))) != null;
- //GridNearGetRequest unmarshalling failed test
+ // GridNearGetRequest unmarshalling failed test.
readCnt.set(1);
- failAtomicGet(++key);
+ failGetAll(++key);
- //GridNearGetResponse unmarshalling failed test
+ // GridNearGetResponse unmarshalling failed test.
readCnt.set(Integer.MAX_VALUE);
jcache(0).put(new TestKey(String.valueOf(++key)), "");
readCnt.set(2);
- failAtomicGet(key);
+ failGetAll(key);
+
+ readCnt.set(Integer.MAX_VALUE);
+ valReadCnt.set(Integer.MAX_VALUE);
+
+ jcache(0).put(new TestKey(String.valueOf(++key)), new TestValue());
+
+ assertNotNull(new TestKey(String.valueOf(key)));
+
+ // GridNearSingleGetRequest unmarshalling failed.
+ readCnt.set(1);
+
+ failGet(key);
+
+ // GridNearSingleGetRequest unmarshalling failed.
+ valReadCnt.set(1);
+ readCnt.set(2);
+
+ failGet(key);
+ }
+
+ /**
+ * Test key.
+ */
+ protected static class TestKey implements Externalizable {
+ /** Field. */
+ @QuerySqlField(index = true)
+ private String field;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public TestKey() {
+ // No-op.
+ }
+
+ /**
+ * @param field Test key 1.
+ */
+ public TestKey(String field) {
+ this.field = field;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey key = (TestKey)o;
+
+ return !(field != null ? !field.equals(key.field) : key.field != null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return field != null ? field.hashCode() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(field);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ field = (String)in.readObject();
+
+ if (readCnt.decrementAndGet() <= 0)
+ throw new IOException("Class can not be unmarshalled.");
+ }
+ }
+
+ /**
+ * Test value.
+ */
+ protected static class TestValue implements Externalizable {
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public TestValue() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ if (valReadCnt.decrementAndGet() <= 0)
+ throw new IOException("Class can not be unmarshalled.");
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 53ac648..c005945 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -135,12 +137,16 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
@Override public void run() {
T2<Ignite, Integer> ignite;
+ Set<Integer> keys = F.asSet(1, 2, 3, 4, 5);
+
while ((ignite = randomNode()) != null) {
IgniteCache<Object, Object> cache = ignite.get1().cache(null);
for (int i = 0; i < 100; i++)
cache.containsKey(ThreadLocalRandom.current().nextInt(100_000));
+ cache.containsKeys(keys);
+
assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 6f0565b..dbd8758 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -661,6 +661,8 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
// It is ok if primary node leaves grid.
}
+ cache.get(key);
+
int c = putCntr.incrementAndGet();
if (c % logFreq == 0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
new file mode 100644
index 0000000..42b5ee3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(client);
+
+ TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client = true;
+
+ startGridsMultiThreaded(SRVS, 1);
+
+ client = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleGetMessage() throws Exception {
+ assertFalse(ignite(0).configuration().isClientMode());
+ assertTrue(ignite(SRVS).configuration().isClientMode());
+
+ List<CacheConfiguration<Integer, Integer>> ccfgs = cacheConfigurations();
+
+ for (int i = 0; i < ccfgs.size(); i++) {
+ CacheConfiguration<Integer, Integer> ccfg = ccfgs.get(i);
+
+ ccfg.setName("cache-" + i);
+
+ log.info("Test cache: " + i);
+
+ ignite(0).createCache(ccfg);
+
+ try {
+ IgniteCache<Integer, Integer> srvCache = ignite(0).cache(ccfg.getName());
+ IgniteCache<Integer, Integer> clientCache = ignite(SRVS).cache(ccfg.getName());
+
+ Integer key = nearKey(clientCache);
+
+ checkSingleGetMessage(clientCache, key, false);
+
+ if (ccfg.getBackups() > 0) {
+ key = backupKeys(srvCache, 1, 100_000).get(0);
+
+ checkSingleGetMessage(srvCache, key, true);
+ }
+
+ if (ccfg.getCacheMode() != REPLICATED) {
+ key = nearKeys(srvCache, 1, 200_000).get(0);
+
+ checkSingleGetMessage(srvCache, key, false);
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param key Key.
+ * @param backup {@code True} if given key is backup key.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private void checkSingleGetMessage(IgniteCache<Integer, Integer> cache,
+ Integer key,
+ boolean backup) throws Exception {
+ CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+ Ignite node = cache.unwrap(Ignite.class);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)node.configuration().getCommunicationSpi();
+
+ spi.record(GridNearSingleGetRequest.class);
+
+ Ignite primary = primaryNode(key, cache.getName());
+
+ assertNotSame(node, primary);
+
+ TestCommunicationSpi primarySpi = (TestCommunicationSpi)primary.configuration().getCommunicationSpi();
+
+ primarySpi.record(GridNearSingleGetResponse.class);
+
+ assertNull(cache.get(key));
+
+ checkMessages(spi, primarySpi);
+
+ assertFalse(cache.containsKey(key));
+
+ checkMessages(spi, primarySpi);
+
+ cache.put(key, 1);
+
+ assertNotNull(cache.get(key));
+
+ if (backup)
+ checkNoMessages(spi, primarySpi);
+ else
+ checkMessages(spi, primarySpi);
+
+ assertTrue(cache.containsKey(key));
+
+ if (backup)
+ checkNoMessages(spi, primarySpi);
+ else
+ checkMessages(spi, primarySpi);
+
+ if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+ cache.remove(key);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ assertNull(cache.get(key));
+
+ tx.commit();
+ }
+
+ checkMessages(spi, primarySpi);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ assertFalse(cache.containsKey(key));
+
+ tx.commit();
+ }
+
+ checkMessages(spi, primarySpi);
+
+ cache.put(key, 1);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ assertNotNull(cache.get(key));
+
+ tx.commit();
+ }
+
+ if (backup)
+ checkNoMessages(spi, primarySpi);
+ else
+ checkMessages(spi, primarySpi);
+
+ try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
+ assertTrue(cache.containsKey(key));
+
+ tx.commit();
+ }
+
+ if (backup)
+ checkNoMessages(spi, primarySpi);
+ else
+ checkMessages(spi, primarySpi);
+ }
+ }
+
+ /**
+ * @param spi Near node SPI.
+ * @param primarySpi Primary node SPI.
+ */
+ private void checkMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) {
+ List<Object> msgs = spi.recordedMessages();
+
+ assertEquals(1, msgs.size());
+ assertTrue(msgs.get(0) instanceof GridNearSingleGetRequest);
+
+ msgs = primarySpi.recordedMessages();
+
+ assertEquals(1, msgs.size());
+ assertTrue(msgs.get(0) instanceof GridNearSingleGetResponse);
+ }
+
+ /**
+ * @param spi Near node SPI.
+ * @param primarySpi Primary node SPI.
+ */
+ private void checkNoMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) {
+ List<Object> msgs = spi.recordedMessages();
+ assertEquals(0, msgs.size());
+
+ msgs = primarySpi.recordedMessages();
+ assertEquals(0, msgs.size());
+ }
+
+ /**
+ * @return Cache configurations to test.
+ */
+ private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
+ List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+
+ ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 0));
+ ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 1));
+ ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, FULL_SYNC, 0));
+
+ ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 0));
+ ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 1));
+ ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, FULL_SYNC, 0));
+
+ return ccfgs;
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param atomicityMode Cache atomicity mode.
+ * @param syncMode Write synchronization mode.
+ * @param backups Number of backups.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(
+ CacheMode cacheMode,
+ CacheAtomicityMode atomicityMode,
+ CacheWriteSynchronizationMode syncMode,
+ int backups) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(syncMode);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** */
+ private Class<?> recordCls;
+
+ /** */
+ private List<Object> recordedMsgs = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ synchronized (this) {
+ if (recordCls != null && msg0.getClass().equals(recordCls))
+ recordedMsgs.add(msg0);
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * @param recordCls Message class to record.
+ */
+ void record(@Nullable Class<?> recordCls) {
+ synchronized (this) {
+ this.recordCls = recordCls;
+ }
+ }
+
+ /**
+ * @return Recorded messages.
+ */
+ List<Object> recordedMessages() {
+ synchronized (this) {
+ List<Object> msgs = recordedMsgs;
+
+ recordedMsgs = new ArrayList<>();
+
+ return msgs;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java
index 319aa56..2fa43be 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.distributed.replicated;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheTransactionalAbstractMetricsSelfTest;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -30,14 +29,6 @@ public class GridCacheReplicatedMetricsSelfTest extends GridCacheTransactionalAb
/** */
private static final int GRID_CNT = 2;
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration c = super.getConfiguration(gridName);
-
- c.getTransactionConfiguration().setTxSerializableEnabled(true);
-
- return c;
- }
-
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
CacheConfiguration cfg = super.cacheConfiguration(gridName);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index b89bffd..fcc8d37 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -283,6 +284,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheGetFutureHangsSelfTest.class);
+ suite.addTestSuite(IgniteCacheSingleGetMessageTest.class);
+
return suite;
}
}
\ No newline at end of file