You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 14:16:36 UTC

[18/35] ignite git commit: IGNITE-2683

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index b12f7a6..9b2bf46 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -885,7 +885,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         TcpDiscoveryAbstractMessage msg;
 
                         try {
-                            msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+                            msg = spi.marsh.unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())
@@ -1210,7 +1210,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
-                            TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+                            TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in,
+                                U.resolveClassLoader(spi.ignite().configuration()));
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {
                                 TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg;
@@ -1642,7 +1643,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                         Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
                         if (data != null)
-                            spi.onExchange(newNodeId, newNodeId, data, null);
+                            spi.onExchange(newNodeId, newNodeId, data,
+                                U.resolveClassLoader(spi.ignite().configuration()));
                     }
                 }
                 else {
@@ -1666,7 +1668,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (dataMap != null) {
                         for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
                             spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
-                                U.resolveClassLoader(spi.ignite().configuration().getClassLoader()));
+                                U.resolveClassLoader(spi.ignite().configuration()));
                     }
 
                     locNode.setAttributes(msg.clientNodeAttributes());
@@ -1963,7 +1965,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (node != null && node.visible()) {
                         try {
                             DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
-                                spi.ignite().configuration().getClassLoader());
+                                U.resolveClassLoader(spi.ignite().configuration()));
 
                             notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index fa0ae1c..3ce983e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2325,7 +2325,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (marshalledMsg == null)
                             marshalledMsg = spi.marsh.marshal(msg);
 
-                        msgClone = spi.marsh.unmarshal(marshalledMsg, null);
+                        msgClone = spi.marsh.unmarshal(marshalledMsg,
+                            U.resolveClassLoader(spi.ignite().configuration()));
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to marshal message: " + msg, e);
@@ -3545,7 +3546,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             SecurityContext coordSubj = spi.marsh.unmarshal(
                                 node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
-                                U.gridClassLoader());
+                                U.resolveClassLoader(spi.ignite().configuration()));
 
                             if (!permissionsEqual(coordSubj.subject().permissions(), subj.subject().permissions())) {
                                 // Node has not pass authentication.
@@ -3601,7 +3602,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     if (data != null)
                         spi.onExchange(node.id(), node.id(), data,
-                            U.resolveClassLoader(spi.ignite().configuration().getClassLoader()));
+                            U.resolveClassLoader(spi.ignite().configuration()));
 
                     msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
 
@@ -3681,7 +3682,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (dataMap != null) {
                     for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
                         spi.onExchange(node.id(), entry.getKey(), entry.getValue(),
-                            U.resolveClassLoader(spi.ignite().configuration().getClassLoader()));
+                            U.resolveClassLoader(spi.ignite().configuration()));
                 }
 
                 processMessageFailedNodes(msg);
@@ -4608,7 +4609,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     DiscoverySpiCustomMessage msgObj = null;
 
                     try {
-                        msgObj = msg.message(spi.marsh, spi.ignite().configuration().getClassLoader());
+                        msgObj = msg.message(spi.marsh, U.resolveClassLoader(spi.ignite().configuration()));
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -4735,7 +4736,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (node != null) {
                     try {
                         DiscoverySpiCustomMessage msgObj = msg.message(spi.marsh,
-                            spi.ignite().configuration().getClassLoader());
+                            U.resolveClassLoader(spi.ignite().configuration()));
 
                         lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
                             msg.topologyVersion(),
@@ -5181,7 +5182,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 while (!isInterrupted()) {
                     try {
-                        TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader());
+                        TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in,
+                            U.resolveClassLoader(spi.ignite().configuration()));
 
                         msg.senderNodeId(nodeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 0d41cd2..277055a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1466,7 +1466,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         try {
             sock.setSoTimeout((int)timeout);
 
-            T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+            T res = marsh.unmarshal(in == null ? sock.getInputStream() : in,
+                U.resolveClassLoader(ignite.configuration()));
 
             return res;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 9064080..2c759a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -77,7 +77,7 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
      */
     @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
         if (msg == null) {
-            msg = marsh.unmarshal(msgBytes, U.resolveClassLoader(ldr));
+            msg = marsh.unmarshal(msgBytes, ldr);
 
             assert msg != null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 1fae875..7508e3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -26,12 +26,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -42,11 +44,13 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventAdapter;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.CachePluginConfiguration;
@@ -61,9 +65,12 @@ import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
+import static org.apache.ignite.events.EventType.EVTS_ALL;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 
+
 /**
  * Tests for replicated cache preloader.
  */
@@ -86,9 +93,17 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     /** */
     private volatile boolean useExtClassLoader = false;
 
+    private volatile boolean disableP2p = false;
+
     /** */
     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** */
+    private static boolean cutromEvt = false;
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -118,6 +133,9 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
 
         cfg.setEventStorageSpi(spi);
 
+        if (disableP2p)
+            cfg.setPeerClassLoadingEnabled(false);
+
         if (getTestGridName(1).equals(gridName) || useExtClassLoader ||
             cfg.getMarshaller() instanceof BinaryMarshaller)
             cfg.setClassLoader(getExternalClassLoader());
@@ -125,6 +143,16 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
         if (isClient)
             cfg.setClientMode(true);
 
+        if (cutromEvt) {
+            int[] evts = new int[EVTS_ALL.length + 1];
+
+            evts[0] = Integer.MAX_VALUE - 1;
+
+            System.arraycopy(EVTS_ALL, 0, evts, 1, EVTS_ALL.length);
+
+            cfg.setIncludeEventTypes(evts);
+        }
+
         return cfg;
     }
 
@@ -474,6 +502,113 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If test failed.
      */
+    public void testExternalClassesAtMessage() throws Exception {
+        try {
+            useExtClassLoader = true;
+            disableP2p = true;
+
+            final Class cls = (Class)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue");
+
+            Ignite g1 = startGrid(1);
+            startGrid(2);
+
+            IgniteMessaging rmtMsg = g1.message();
+
+            latch = new CountDownLatch(2);
+
+            rmtMsg.remoteListen("MyOrderedTopic", new MessageListener());
+
+            Object o = cls.newInstance();
+
+            o.toString();
+
+            rmtMsg.send("MyOrderedTopic", o);
+            rmtMsg.sendOrdered("MyOrderedTopic", o, 0);
+
+            latch.await();
+
+            // Custom topic.
+
+            final Class cls2 = (Class)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue");
+
+            Object topic = cls2.getEnumConstants()[0];
+
+            latch = new CountDownLatch(2);
+
+            rmtMsg.remoteListen(topic, new MessageListener());
+
+            rmtMsg.send(topic, topic);
+            rmtMsg.sendOrdered(topic, topic, 0);
+
+            latch.await();
+
+        }
+        finally {
+            useExtClassLoader = false;
+            disableP2p = false;
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testExternalClassesAtEventP2pDisabled() throws Exception {
+        testExternalClassesAtEvent0(true);
+
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testExternalClassesAtEvent() throws Exception {
+        testExternalClassesAtEvent0(false);
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    private void testExternalClassesAtEvent0(boolean p2p) throws Exception {
+        try {
+            useExtClassLoader = true;
+            cutromEvt = true;
+
+            if (p2p)
+                disableP2p = true;
+
+            final Class cls = (Class)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.CacheDeploymentExternalizableTestValue");
+            final Class cls2 = (Class)getExternalClassLoader().
+                loadClass("org.apache.ignite.tests.p2p.GridEventConsumeFilter");
+
+            Ignite g1 = startGrid(1);
+            startGrid(2);
+
+            latch = new CountDownLatch(3);
+
+            g1.events().localListen((IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT);
+            g1.events().localListen(new EventListener(), EVT_CACHE_OBJECT_PUT);
+
+            g1.events().remoteListen(null, (IgnitePredicate)cls2.newInstance(), EVT_CACHE_OBJECT_PUT);
+            g1.events().remoteListen(null, new EventListener(), EVT_CACHE_OBJECT_PUT);
+
+            g1.cache(null).put("1", cls.newInstance());
+
+            latch.await();
+        }
+        finally {
+            useExtClassLoader = false;
+            cutromEvt = false;
+
+            if (p2p)
+                disableP2p = false;
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
     public void testSync() throws Exception {
         preloadMode = SYNC;
         batchSize = 512;
@@ -773,4 +908,32 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    private static class MessageListener implements P2<UUID, Object> {
+        /**
+         * @param nodeId
+         * @param msg
+         * @return
+         */
+        @Override public boolean apply(UUID nodeId, Object msg) {
+            System.out.println("Received message [msg=" + msg + ", from=" + nodeId + ']');
+
+            latch.countDown();
+
+            return true; // Return true to continue listening.
+        }
+    }
+
+    private static class EventListener implements IgnitePredicate<Event> {
+        @Override public boolean apply(Event evt) {
+            System.out.println("Cache event: " + evt);
+
+            latch.countDown();
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java
new file mode 100644
index 0000000..c5c0b4d
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentExternalizableTestValue.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+
+/**
+ * Test value for deployment.
+ */
+public class CacheDeploymentExternalizableTestValue implements Serializable {
+    /** */
+    private CacheDeploymentExternalizableTestValue2 field;
+
+    /**
+     * @return value
+     */
+    public CacheDeploymentExternalizableTestValue2 getField() {
+        return field;
+    }
+
+    /**
+     * @param field field
+     */
+    public void setField(
+        CacheDeploymentExternalizableTestValue2 field) {
+        this.field = field;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        field = new CacheDeploymentExternalizableTestValue2();
+
+        return super.toString();
+    }
+
+    /**
+     *
+     */
+    public static class CacheDeploymentExternalizableTestValue2 implements Externalizable {
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 288c2b3..cae1a9f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1501,7 +1501,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
 
                 @Override public Object deserialize(byte[] bytes) throws Exception {
-                    return marshaller.unmarshal(bytes, null);
+                    ClassLoader clsLdr = ctx != null ? U.resolveClassLoader(ctx.config()) : null;
+
+                    return marshaller.unmarshal(bytes, clsLdr);
                 }
             };
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33cc0ce/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 21541ec..3d226e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -411,7 +411,7 @@ public class GridMapQueryExecutor {
                     Marshaller m = ctx.config().getMarshaller();
 
                     for (GridCacheSqlQuery qry : qrys)
-                        qry.unmarshallParams(m);
+                        qry.unmarshallParams(m, ctx);
                 }
             }
             catch (IgniteCheckedException e) {