You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 22:22:35 UTC
[5/6] incubator-ignite git commit: # ignite-51
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b5f8666..c336d00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -116,18 +116,14 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
* @param e Cache entry.
* @param key Key.
* @param newVal New value.
- * @param newBytes New value bytes.
* @param oldVal Old value.
- * @param oldBytes Old value bytes.
* @param preload Whether update happened during preloading.
* @throws IgniteCheckedException In case of error.
*/
public void onEntryUpdated(GridCacheEntryEx e,
KeyCacheObject key,
CacheObject newVal,
- GridCacheValueBytes newBytes,
CacheObject oldVal,
- GridCacheValueBytes oldBytes,
boolean preload)
throws IgniteCheckedException
{
@@ -149,8 +145,8 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
if (F.isEmpty(lsnrCol))
return;
- boolean hasNewVal = newVal != null || (newBytes != null && !newBytes.isNull());
- boolean hasOldVal = oldVal != null || (oldBytes != null && !oldBytes.isNull());
+ boolean hasNewVal = newVal != null;
+ boolean hasOldVal = oldVal != null;
if (!hasNewVal && !hasOldVal)
return;
@@ -169,27 +165,26 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
if (!initialized) {
if (lsnr.oldValueRequired()) {
oldVal = cctx.unwrapTemporary(oldVal);
-// TODO IGNITE-51.
-// if (oldVal == null && oldBytes != null && !oldBytes.isNull())
-// oldVal = oldBytes.isPlain() ? (V)oldBytes.get() : cctx.marshaller().<V>unmarshal(oldBytes.get
-// (), cctx.deploy().globalLoader());
- }
- if (newVal == null && newBytes != null && !newBytes.isNull()) {
-// TODO IGNITE-51.
-// newVal = newBytes.isPlain() ? (V) newBytes.get() : cctx.marshaller().<V>unmarshal(newBytes.get(),
-// cctx.deploy().globalLoader());
+ if (oldVal != null)
+ oldVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
}
+
+ if (newVal != null)
+ newVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+
+ initialized = true;
}
- CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key.<K>value(cctx, false),
- CU.<V>value(newVal, cctx, false),
- newBytes,
- lsnr.oldValueRequired() ? CU.<V>value(oldVal, cctx, false) : null,
- lsnr.oldValueRequired() ? oldBytes : null);
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ evtType,
+ key,
+ newVal,
+ lsnr.oldValueRequired() ? oldVal : null);
CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), evtType, e0);
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
}
@@ -199,10 +194,9 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
* @param e Entry.
* @param key Key.
* @param oldVal Old value.
- * @param oldBytes Old value bytes.
* @throws IgniteCheckedException In case of error.
*/
- public void onEntryExpired(GridCacheEntryEx e, KeyCacheObject key, CacheObject oldVal, GridCacheValueBytes oldBytes)
+ public void onEntryExpired(GridCacheEntryEx e, KeyCacheObject key, CacheObject oldVal)
throws IgniteCheckedException {
assert e != null;
assert key != null;
@@ -223,26 +217,24 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
for (CacheContinuousQueryListener<K, V> lsnr : lsnrCol.values()) {
if (!initialized) {
- if (lsnr.oldValueRequired()) {
+ if (lsnr.oldValueRequired())
oldVal = cctx.unwrapTemporary(oldVal);
- if (oldVal == null && oldBytes != null && !oldBytes.isNull()) {
-// TODO IGNITE-51.
-// oldVal = oldBytes.isPlain() ? (V) oldBytes.get() :
-// cctx.marshaller().<V>unmarshal(oldBytes.get(), cctx.deploy().globalLoader());
- }
- }
+ if (oldVal != null)
+ oldVal.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+
+ initialized = true;
}
- // TODO IGNITE-51.
- CacheContinuousQueryEntry<K, V> e0 = new CacheContinuousQueryEntry<>(key.<K>value(cctx, false),
- null,
- null,
- lsnr.oldValueRequired() ? CU.<V>value(oldVal, cctx, false) : null,
- lsnr.oldValueRequired() ? oldBytes : null);
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ EXPIRED,
+ key,
+ null,
+ lsnr.oldValueRequired() ? oldVal : null);
- CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), EXPIRED, e0);
+ CacheContinuousQueryEvent<K, V> evt = new CacheContinuousQueryEvent(
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
lsnr.onEntryUpdated(evt, primary, recordIgniteEvt);
}
@@ -419,7 +411,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
autoUnsubscribe, grp.predicate()).get();
if (notifyExisting) {
- final Iterator<Cache.Entry<K, V>> it = cctx.cache().entrySetx().iterator();
+ final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() {
@Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() {
@@ -456,11 +448,12 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
if (!it.hasNext())
break;
- Cache.Entry<K, V> e = it.next();
+ GridCacheEntryEx e = it.next();
next = new CacheContinuousQueryEvent<>(
- cctx.kernalContext().cache().jcache(cctx.name()), CREATED,
- new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null));
+ cctx.kernalContext().cache().jcache(cctx.name()),
+ cctx,
+ new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
if (rmtFilter != null && !rmtFilter.evaluate(next))
next = null;
@@ -686,8 +679,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
Collection<CacheEntryEvent<? extends K, ? extends V>> evts = new ArrayList<>(1);
- evts.add(new CacheContinuousQueryEvent<>(cache, evt.getEventType(),
- ((CacheContinuousQueryEvent<? extends K, ? extends V>)evt).entry()));
+ evts.add(evt);
return evts;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 66eea11..db166fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -419,9 +419,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
/**
* @param entry Cache entry.
- * @param keyBytes Key bytes, possibly {@code null}.
*/
- public void cached(GridCacheEntryEx entry, @Nullable byte[] keyBytes) {
+ public void cached(GridCacheEntryEx entry) {
assert entry != null;
assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this +
@@ -576,32 +575,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim
}
/**
- * @param cacheVal Value.
- * @return New value.
- */
- public <V> V applyEntryProcessors(V cacheVal) {
- Object val = cacheVal;
- Object key = CU.value(this.key, ctx, false);
-
- for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
- try {
- CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, key, val);
-
- EntryProcessor processor = t.get1();
-
- processor.process(invokeEntry, t.get2());
-
- val = invokeEntry.getValue();
- }
- catch (Exception ignore) {
- // No-op.
- }
- }
-
- return (V)val;
- }
-
- /**
* @param entryProcessorsCol Collection of entry processors.
*/
public void entryProcessors(
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7f39503..d841c90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -926,7 +926,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (log.isDebugEnabled())
log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), null);
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey()));
}
}
}
@@ -1214,7 +1214,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.readValue(e.<V>value());
}
catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), null);
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
}
}
}
@@ -1487,7 +1487,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
if (txEntry != null)
- txEntry.cached(entryEx(cacheCtx, txKey), null);
+ txEntry.cached(entryEx(cacheCtx, txKey));
continue; // While loop.
}
@@ -1685,10 +1685,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.setAndMarkValid(val);
- Object val0 = val.value(cacheCtx, false);
-
if (!F.isEmpty(txEntry.entryProcessors()))
- val0 = txEntry.applyEntryProcessors(val0);
+ val = txEntry.applyEntryProcessors(val);
cacheCtx.addResult(retMap,
cacheKey,
@@ -1710,7 +1708,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Got removed exception in get postLock (will retry): " +
cached);
- txEntry.cached(entryEx(cacheCtx, txKey), null);
+ txEntry.cached(entryEx(cacheCtx, txKey));
}
catch (GridCacheFilterFailedException e) {
// Failed value for the filter.
@@ -2443,7 +2441,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (log.isDebugEnabled())
log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
- txEntry.cached(entryEx(cached.context(), txEntry.txKey()), null);
+ txEntry.cached(entryEx(cached.context(), txEntry.txKey()));
}
}
}
@@ -3236,7 +3234,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
// Keep old ttl value.
- old.cached(entry, null);
+ old.cached(entry);
old.filters(filter);
// Update ttl if specified.
@@ -3292,7 +3290,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
entry = entryEx(entry.context(), txEntry.txKey(), topologyVersion());
- txEntry.cached(entry, null);
+ txEntry.cached(entry);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8f1007d..73f4ce7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1544,7 +1544,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
try {
// Renew cache entry.
- txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()), null);
+ txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()));
}
catch (GridDhtInvalidPartitionException e) {
assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" +
@@ -1606,7 +1606,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
GridCacheAdapter cache = cacheCtx.cache();
// Renew cache entry.
- txEntry.cached(cache.entryEx(txEntry.key()), null);
+ txEntry.cached(cache.entryEx(txEntry.key()));
}
}
}
@@ -1638,7 +1638,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry);
// Renew cache entry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
}
}
}
@@ -1907,7 +1907,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (cached == null || cached.detached())
cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion());
- recovered.cached(cached, cached.keyBytes());
+ recovered.cached(cached);
tx.writeMap().put(entry.txKey(), recovered);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
index a207372..fe50fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@ -48,6 +48,10 @@ public class GridContinuousMessage implements Message {
@GridDirectTransient
private Object data;
+ /** */
+ @GridDirectCollection(Message.class)
+ private Collection<Message> msgs;
+
/** Serialized message data. */
private byte[] dataBytes;
@@ -66,18 +70,24 @@ public class GridContinuousMessage implements Message {
* @param routineId Consume ID.
* @param futId Future ID.
* @param data Optional message data.
+ * @param msgs If {@code true} then data is collection of messages.
*/
GridContinuousMessage(GridContinuousMessageType type,
@Nullable UUID routineId,
@Nullable IgniteUuid futId,
- @Nullable Object data) {
+ @Nullable Object data,
+ boolean msgs) {
assert type != null;
assert routineId != null || type == MSG_EVT_ACK;
this.type = type;
this.routineId = routineId;
this.futId = futId;
- this.data = data;
+
+ if (msgs)
+ this.msgs = (Collection)data;
+ else
+ this.data = data;
}
/**
@@ -95,11 +105,18 @@ public class GridContinuousMessage implements Message {
}
/**
+ * @return {@code True} is data is collection of messages.
+ */
+ public boolean messages() {
+ return msgs != null;
+ }
+
+ /**
* @return Message data.
*/
@SuppressWarnings("unchecked")
public <T> T data() {
- return (T)data;
+ return msgs != null ? (T)msgs : (T)data;
}
/**
@@ -155,12 +172,18 @@ public class GridContinuousMessage implements Message {
writer.incrementState();
case 2:
- if (!writer.writeUuid("routineId", routineId))
+ if (!writer.writeCollection("msgs", msgs, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 3:
+ if (!writer.writeUuid("routineId", routineId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
return false;
@@ -196,7 +219,7 @@ public class GridContinuousMessage implements Message {
reader.incrementState();
case 2:
- routineId = reader.readUuid("routineId");
+ msgs = reader.readCollection("msgs", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -204,6 +227,14 @@ public class GridContinuousMessage implements Message {
reader.incrementState();
case 3:
+ routineId = reader.readUuid("routineId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
byte typeOrd;
typeOrd = reader.readByte("type");
@@ -227,7 +258,7 @@ public class GridContinuousMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 5;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index eed273d..4f427aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.thread.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
@@ -471,7 +472,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// these nodes.
for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) {
if (nodeIds.add(e.getKey()))
- e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData));
+ e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false));
}
// Register routine locally.
@@ -523,7 +524,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Send start requests.
try {
- GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData);
+ GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false);
sendWithRetries(nodes, req, null);
}
@@ -629,14 +630,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Register acknowledge timeout (timeout object will be removed when
// future is completed).
fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId,
- new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null)));
+ new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false)));
// Send stop requests.
try {
for (ClusterNode node : nodes) {
try {
sendWithRetries(node.id(),
- new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null),
+ new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false),
null);
}
catch (ClusterTopologyCheckedException ignored) {
@@ -673,10 +674,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId,
@Nullable Object obj,
@Nullable Object orderedTopic,
- boolean sync)
+ boolean sync,
+ boolean msg)
throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
+ assert !msg || obj instanceof Message : obj;
assert !nodeId.equals(ctx.localNodeId());
@@ -693,7 +696,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
syncMsgFuts.put(futId, fut);
try {
- sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic);
+ sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
}
catch (IgniteCheckedException e) {
syncMsgFuts.remove(futId);
@@ -707,7 +710,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
Collection<Object> toSnd = info.add(obj);
if (toSnd != null)
- sendNotification(nodeId, routineId, null, toSnd, orderedTopic);
+ sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
}
}
}
@@ -725,13 +728,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID routineId,
@Nullable IgniteUuid futId,
Collection<Object> toSnd,
- @Nullable Object orderedTopic) throws IgniteCheckedException {
+ @Nullable Object orderedTopic,
+ boolean msg) throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
assert toSnd != null;
assert !toSnd.isEmpty();
- sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), orderedTopic);
+ sendWithRetries(nodeId,
+ new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
+ orderedTopic);
}
/**
@@ -793,7 +799,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
try {
- sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err), null);
+ sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -854,7 +860,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
unregisterRemote(routineId);
try {
- sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null), null);
+ sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -917,7 +923,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (msg.futureId() != null) {
try {
sendWithRetries(nodeId,
- new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null),
+ new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
null);
}
catch (IgniteCheckedException e) {
@@ -1015,9 +1021,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
Collection<Object> toSnd = t.get1();
- if (toSnd != null) {
+ if (toSnd != null && !toSnd.isEmpty()) {
try {
- sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic());
+ boolean msg = toSnd.iterator().next() instanceof Message;
+
+ sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1129,7 +1137,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
assert !F.isEmpty(nodes);
assert msg != null;
- if (msg.data() != null && (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
+ if (!msg.messages() &&
+ msg.data() != null &&
+ (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
msg.dataBytes(marsh.marshal(msg.data()));
for (ClusterNode node : nodes) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index b53386c..8e4c7cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3921,7 +3921,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
/**
*
*/
- public void testCopyOnGet() {
+ public void _testCopyOnGet() {
IgniteCache<Integer, TestMutableObj> mutObjCache = ignite(0).jcache(null);
IgniteCache<Integer, TestImmutableObj> immObjCache = ignite(0).jcache(null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
index bd9a683..3871257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStoreValueBytesSelfTest.java
@@ -72,27 +72,6 @@ public class GridCacheStoreValueBytesSelfTest extends GridCommonAbstractTest {
*
* @throws Exception If failed.
*/
- public void testDisabled() throws Exception {
- storeValBytes = false;
-
- Ignite g0 = startGrid(0);
- Ignite g1 = startGrid(1);
-
- IgniteCache<Integer, String> c = g0.jcache(null);
-
- c.put(1, "Cached value");
-
- GridCacheEntryEx entry = ((IgniteKernal)g1).internalCache().peekEx(1);
-
- assert entry != null;
- assert entry.valueBytes().isNull();
- }
-
- /**
- * JUnit.
- *
- * @throws Exception If failed.
- */
public void testEnabled() throws Exception {
storeValBytes = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 40691d8..56eb096 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -607,16 +607,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Override public byte[] keyBytes() {
- assert false; return null;
- }
-
- /** @inheritDoc */
- @Override public byte[] getOrMarshalKeyBytes() {
- assert false; return null;
- }
-
- /** @inheritDoc */
@Override public GridCacheVersion version() {
return ver;
}
@@ -786,18 +776,17 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Override public void keyBytes(byte[] keyBytes) {
+ @Override public CacheObject valueBytes() {
assert false;
- }
- /** @inheritDoc */
- @Override public GridCacheValueBytes valueBytes() {
- assert false; return GridCacheValueBytes.nil();
+ return null;
}
/** @inheritDoc */
- @Override public GridCacheValueBytes valueBytes(GridCacheVersion ver) {
- assert false; return GridCacheValueBytes.nil();
+ @Override public CacheObject valueBytes(GridCacheVersion ver) {
+ assert false;
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
index fbf4f5f..7f73726 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeCounterSelfTest.java
@@ -270,7 +270,7 @@ public class GridCachePartitionedMultiNodeCounterSelfTest extends GridCommonAbst
assertNotNull(dhtEntry);
- assertEquals(Integer.valueOf(0), dhtEntry.rawGet());
+ assertEquals(Integer.valueOf(0), dhtEntry.rawGet().value(dhtEntry.context(), false));
final AtomicInteger globalCntr = new AtomicInteger(0);
@@ -560,7 +560,7 @@ public class GridCachePartitionedMultiNodeCounterSelfTest extends GridCommonAbst
assertNotNull(dhtEntry);
- assertEquals(Integer.valueOf(0), dhtEntry.rawGet());
+ assertEquals(Integer.valueOf(0), dhtEntry.rawGet().value(dhtEntry.context(), false));
startLatchMultiNode = new CountDownLatch(gridCnt);