You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/28 15:09:55 UTC
[5/9] ignite git commit: Support optional IO policy resolver in
DataStreamer.
Support optional IO policy resolver in DataStreamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4df74187
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4df74187
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4df74187
Branch: refs/heads/ignite-1786
Commit: 4df74187072d839bdd09eb3b6163e2612fadc1c3
Parents: 2cfe0cb
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 28 12:24:16 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 28 12:24:16 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 43 +++++++++++++++++---
.../processors/cache/GridCacheAdapter.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 8 +++-
.../datastreamer/DataStreamerImpl.java | 37 ++++++++++++++++-
4 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4577dc8..4bc2eea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -115,6 +115,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Direct protocol version. */
public static final byte DIRECT_PROTO_VER = 2;
+ /** Current IO policy. */
+ private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
+
/** Listeners by topic. */
private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
@@ -764,7 +767,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert obj != null;
- lsnr.onMessage(nodeId, obj);
+ invokeListener(msg.policy(), lsnr, nodeId, obj);
}
finally {
threadProcessingMessage(false);
@@ -841,7 +844,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert obj != null;
- lsnr.onMessage(nodeId, obj);
+ invokeListener(msg.policy(), lsnr, nodeId, obj);
}
/**
@@ -1197,6 +1200,38 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * Invoke message listener.
+ *
+ * @param plc Policy.
+ * @param lsnr Listener.
+ * @param nodeId Node ID.
+ * @param msg Message.
+ */
+ private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) {
+ Byte oldPlc = CUR_PLC.get();
+
+ boolean change = F.eq(oldPlc, plc);
+
+ if (change)
+ CUR_PLC.set(plc);
+
+ try {
+ lsnr.onMessage(nodeId, msg);
+ }
+ finally {
+ if (change)
+ CUR_PLC.set(oldPlc);
+ }
+ }
+
+ /**
+ * @return Current IO policy
+ */
+ @Nullable public static Byte currentPolicy() {
+ return CUR_PLC.get();
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
@@ -2418,9 +2453,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) {
try {
- lsnr.onMessage(
- nodeId,
- t.get1().message());
+ invokeListener(plc, lsnr, nodeId, t.get1().message());
}
finally {
if (t.get3() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 86036ac..a1f0f28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5920,7 +5920,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.toCacheObject(val),
ttl,
0,
- ver);
+ ver.conflictVersion());
e.prepareDirectMarshal(ctx.cacheObjectContext());
http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index d899c67..c7c1f5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -339,7 +340,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
try {
- ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+ Byte plc = GridIoManager.currentPolicy();
+
+ if (plc == null)
+ plc = PUBLIC_POOL;
+
+ ctx.io().send(nodeId, resTopic, res, plc);
}
catch (IgniteCheckedException e) {
if (ctx.discovery().alive(nodeId))
http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 7564376..4599060 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -91,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
@@ -109,6 +111,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
*/
@SuppressWarnings("unchecked")
public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+ /** Default policy reoslver. */
+ private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
+
/** Isolated receiver. */
private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
@@ -118,6 +123,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** */
private byte[] updaterBytes;
+ /** IO policy resovler for data load request. */
+ private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+
/** Max remap count before issuing an error. */
private static final int DFLT_MAX_REMAP_CNT = 32;
@@ -602,6 +610,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * @param ioPlcRslvr IO policy resolver.
+ */
+ public void ioPolicyResolver(IgniteClosure<ClusterNode, Byte> ioPlcRslvr) {
+ this.ioPlcRslvr = ioPlcRslvr;
+ }
+
+ /**
* @param entries Entries.
* @param resFut Result future.
* @param activeKeys Active keys.
@@ -1257,7 +1272,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
IgniteInternalFuture<Object> fut;
- if (isLocNode) {
+ Byte plc = ioPlcRslvr.apply(node);
+
+ if (plc == null)
+ plc = PUBLIC_POOL;
+
+ if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
fut = ctx.closure().callLocalSafe(
new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
@@ -1355,7 +1375,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topVer);
try {
- ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+ ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
if (log.isDebugEnabled())
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
@@ -1620,4 +1640,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
}
}
+
+ /**
+ * Default IO policy resolver.
+ */
+ private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public Byte apply(ClusterNode gridNode) {
+ return PUBLIC_POOL;
+ }
+ }
}