You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/28 15:09:55 UTC

[5/9] ignite git commit: Support optional IO policy resolver in DataStreamer.

Support optional IO policy resolver in DataStreamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4df74187
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4df74187
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4df74187

Branch: refs/heads/ignite-1786
Commit: 4df74187072d839bdd09eb3b6163e2612fadc1c3
Parents: 2cfe0cb
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 28 12:24:16 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 28 12:24:16 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 43 +++++++++++++++++---
 .../processors/cache/GridCacheAdapter.java      |  2 +-
 .../datastreamer/DataStreamProcessor.java       |  8 +++-
 .../datastreamer/DataStreamerImpl.java          | 37 ++++++++++++++++-
 4 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4577dc8..4bc2eea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -115,6 +115,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Direct protocol version. */
     public static final byte DIRECT_PROTO_VER = 2;
 
+    /** Current IO policy. */
+    private static final ThreadLocal<Byte> CUR_PLC = new ThreadLocal<>();
+
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
 
@@ -764,7 +767,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                     assert obj != null;
 
-                    lsnr.onMessage(nodeId, obj);
+                    invokeListener(msg.policy(), lsnr, nodeId, obj);
                 }
                 finally {
                     threadProcessingMessage(false);
@@ -841,7 +844,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         assert obj != null;
 
-        lsnr.onMessage(nodeId, obj);
+        invokeListener(msg.policy(), lsnr, nodeId, obj);
     }
 
     /**
@@ -1197,6 +1200,38 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * Invoke message listener.
+     *
+     * @param plc Policy.
+     * @param lsnr Listener.
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) {
+        Byte oldPlc = CUR_PLC.get();
+
+        boolean change = F.eq(oldPlc, plc);
+
+        if (change)
+            CUR_PLC.set(plc);
+
+        try {
+            lsnr.onMessage(nodeId, msg);
+        }
+        finally {
+            if (change)
+                CUR_PLC.set(oldPlc);
+        }
+    }
+
+    /**
+     * @return Current IO policy
+     */
+    @Nullable public static Byte currentPolicy() {
+        return CUR_PLC.get();
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param topicOrd GridTopic enumeration ordinal.
@@ -2418,9 +2453,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) {
                 try {
-                    lsnr.onMessage(
-                        nodeId,
-                        t.get1().message());
+                    invokeListener(plc, lsnr, nodeId, t.get1().message());
                 }
                 finally {
                     if (t.get3() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 86036ac..a1f0f28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -5920,7 +5920,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 ctx.toCacheObject(val),
                 ttl,
                 0,
-                ver);
+                ver.conflictVersion());
 
             e.prepareDirectMarshal(ctx.cacheObjectContext());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index d899c67..c7c1f5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -339,7 +340,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
 
         try {
-            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+            Byte plc = GridIoManager.currentPolicy();
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
+
+            ctx.io().send(nodeId, resTopic, res, plc);
         }
         catch (IgniteCheckedException e) {
             if (ctx.discovery().alive(nodeId))

http://git-wip-us.apache.org/repos/asf/ignite/blob/4df74187/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 7564376..4599060 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -91,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
@@ -109,6 +111,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+    /** Default policy reoslver. */
+    private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
+
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
 
@@ -118,6 +123,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** */
     private byte[] updaterBytes;
 
+    /** IO policy resovler for data load request. */
+    private IgniteClosure<ClusterNode, Byte> ioPlcRslvr = DFLT_IO_PLC_RSLVR;
+
     /** Max remap count before issuing an error. */
     private static final int DFLT_MAX_REMAP_CNT = 32;
 
@@ -602,6 +610,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * @param ioPlcRslvr IO policy resolver.
+     */
+    public void ioPolicyResolver(IgniteClosure<ClusterNode, Byte> ioPlcRslvr) {
+        this.ioPlcRslvr = ioPlcRslvr;
+    }
+
+    /**
      * @param entries Entries.
      * @param resFut Result future.
      * @param activeKeys Active keys.
@@ -1257,7 +1272,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             IgniteInternalFuture<Object> fut;
 
-            if (isLocNode) {
+            Byte plc = ioPlcRslvr.apply(node);
+
+            if (plc == null)
+                plc = PUBLIC_POOL;
+
+            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
                 fut = ctx.closure().callLocalSafe(
                     new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
 
@@ -1355,7 +1375,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     topVer);
 
                 try {
-                    ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+                    ctx.io().send(node, TOPIC_DATASTREAM, req, plc);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
@@ -1620,4 +1640,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
         }
     }
+
+    /**
+     * Default IO policy resolver.
+     */
+    private static class DefaultIoPolicyResolver implements IgniteClosure<ClusterNode, Byte> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public Byte apply(ClusterNode gridNode) {
+            return PUBLIC_POOL;
+        }
+    }
 }