You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/19 12:05:42 UTC
[2/5] ignite git commit: IGNITE-4157 Use discovery custom messages
instead of marshaller cache - Fixes #1271.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index bd52c04..cdd1f2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -17,16 +17,14 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;
@@ -44,6 +42,9 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Added node. */
private final TcpDiscoveryNode node;
+ /** */
+ private DiscoveryDataPacket dataPacket;
+
/** Pending messages from previous node. */
private Collection<TcpDiscoveryAbstractMessage> msgs;
@@ -64,12 +65,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
/** Topology snapshots history. */
private Map<Long, Collection<ClusterNode>> topHist;
- /** Discovery data from new node. */
- private Map<Integer, byte[]> newNodeDiscoData;
-
- /** Discovery data from old nodes. */
- private Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData;
-
/** Start time of the first grid node. */
private final long gridStartTime;
@@ -78,12 +73,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
*
* @param creatorNodeId Creator node ID.
* @param node Node to add to topology.
- * @param newNodeDiscoData New Node discovery data.
+ * @param dataPacket container for collecting discovery data across the cluster.
* @param gridStartTime Start time of the first grid node.
*/
public TcpDiscoveryNodeAddedMessage(UUID creatorNodeId,
TcpDiscoveryNode node,
- Map<Integer, byte[]> newNodeDiscoData,
+ DiscoveryDataPacket dataPacket,
long gridStartTime)
{
super(creatorNodeId);
@@ -92,10 +87,8 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
assert gridStartTime > 0;
this.node = node;
- this.newNodeDiscoData = newNodeDiscoData;
+ this.dataPacket = dataPacket;
this.gridStartTime = gridStartTime;
-
- oldNodesDiscoData = new LinkedHashMap<>();
}
/**
@@ -111,8 +104,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
this.top = msg.top;
this.clientTop = msg.clientTop;
this.topHist = msg.topHist;
- this.newNodeDiscoData = msg.newNodeDiscoData;
- this.oldNodesDiscoData = msg.oldNodesDiscoData;
+ this.dataPacket = msg.dataPacket;
this.gridStartTime = msg.gridStartTime;
}
@@ -222,63 +214,17 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
}
/**
- * @return Discovery data from new node.
- */
- public Map<Integer, byte[]> newNodeDiscoveryData() {
- return newNodeDiscoData;
- }
-
- /**
- * @return Discovery data from old nodes.
+ * @return {@link DiscoveryDataPacket} carried by this message.
*/
- public Map<UUID, Map<Integer, byte[]>> oldNodesDiscoveryData() {
- return oldNodesDiscoData;
- }
-
- /**
- * @param oldNodesDiscoData Discovery data from old nodes.
- */
- public void oldNodesDiscoveryData(Map<UUID, Map<Integer, byte[]>> oldNodesDiscoData) {
- this.oldNodesDiscoData = oldNodesDiscoData;
- }
-
- /**
- * @param nodeId Node ID.
- * @param discoData Discovery data to add.
- */
- public void addDiscoveryData(UUID nodeId, Map<Integer, byte[]> discoData) {
- // Old nodes disco data may be null if message
- // makes more than 1 pass due to stopping of the nodes in topology.
- if (oldNodesDiscoData != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> existingDataEntry : oldNodesDiscoData.entrySet()) {
- Map<Integer, byte[]> existingData = existingDataEntry.getValue();
-
- Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<Integer, byte[]> discoDataEntry = it.next();
-
- byte[] curData = existingData.get(discoDataEntry.getKey());
-
- if (Arrays.equals(curData, discoDataEntry.getValue()))
- it.remove();
- }
-
- if (discoData.isEmpty())
- break;
- }
-
- if (!discoData.isEmpty())
- oldNodesDiscoData.put(nodeId, discoData);
- }
+ public DiscoveryDataPacket gridDiscoveryData() {
+ return dataPacket;
}
/**
* Clears discovery data to minimize message size.
*/
public void clearDiscoveryData() {
- newNodeDiscoData = null;
- oldNodesDiscoData = null;
+ dataPacket = null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
index 38685f2..2d7eca4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java
@@ -118,7 +118,7 @@ public class GridAffinitySelfTest extends GridCommonAbstractTest {
private Collection<CacheConfiguration> caches(Ignite g) {
return F.view(Arrays.asList(g.configuration().getCacheConfiguration()), new IgnitePredicate<CacheConfiguration>() {
@Override public boolean apply(CacheConfiguration c) {
- return !CU.MARSH_CACHE_NAME.equals(c.getName()) && !CU.UTILITY_CACHE_NAME.equals(c.getName()) &&
+ return !CU.UTILITY_CACHE_NAME.equals(c.getName()) &&
!CU.ATOMICS_CACHE_NAME.equals(c.getName()) && !CU.SYS_CACHE_HADOOP_MR.equals(c.getName());
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/MarshallerContextLockingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/MarshallerContextLockingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/MarshallerContextLockingSelfTest.java
index bf0adf8..cc2bc39 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/MarshallerContextLockingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/MarshallerContextLockingSelfTest.java
@@ -17,37 +17,49 @@
package org.apache.ignite.internal;
-import java.io.File;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.EventType;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingTransport;
import org.apache.ignite.testframework.GridTestClassLoader;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
+
/**
* Test marshaller context.
*/
public class MarshallerContextLockingSelfTest extends GridCommonAbstractTest {
/** Inner logger. */
- private InnerLogger innerLog = null;
+ private InnerLogger innerLog;
+
+ private GridTestKernalContext ctx;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
innerLog = new InnerLogger();
- log = innerLog;
+ IgniteConfiguration iCfg = new IgniteConfiguration();
+ iCfg.setClientMode(false);
+
+ ctx = new GridTestKernalContext(innerLog, iCfg) {
+ @Override public IgniteLogger log(Class<?> cls) {
+ return innerLog;
+ }
+ };
+
+ ctx.setSystemExecutorService(Executors.newFixedThreadPool(12));
}
/**
- * Mumtithread test, used custom class loader
+ * Multithreaded test, used custom class loader
*/
public void testMultithreadedUpdate() throws Exception {
multithreaded(new Callable<Object>() {
@@ -55,7 +67,10 @@ public class MarshallerContextLockingSelfTest extends GridCommonAbstractTest {
GridTestClassLoader classLoader = new GridTestClassLoader(
InternalExecutor.class.getName(),
MarshallerContextImpl.class.getName(),
- MarshallerContextImpl.ContinuousQueryListener.class.getName()
+ MarshallerContextImpl.CombinedMap.class.getName(),
+ MappingStoreTask.class.getName(),
+ MarshallerMappingFileStore.class.getName(),
+ MarshallerMappingTransport.class.getName()
);
Thread.currentThread().setContextClassLoader(classLoader);
@@ -64,7 +79,8 @@ public class MarshallerContextLockingSelfTest extends GridCommonAbstractTest {
Object internelExecutor = clazz.newInstance();
- clazz.getMethod("executeTest", GridTestLog4jLogger.class).invoke(internelExecutor, log);
+ clazz.getMethod("executeTest", GridTestLog4jLogger.class, GridKernalContext.class)
+ .invoke(internelExecutor, log, ctx);
return null;
}
@@ -72,9 +88,9 @@ public class MarshallerContextLockingSelfTest extends GridCommonAbstractTest {
assertTrue(InternalExecutor.counter.get() == 0);
- assertTrue(innerLog.contains("File already locked"));
-
assertTrue(!innerLog.contains("Exception"));
+
+ assertTrue(innerLog.contains("File already locked"));
}
/**
@@ -87,21 +103,16 @@ public class MarshallerContextLockingSelfTest extends GridCommonAbstractTest {
/**
* Executes onUpdated
*/
- public void executeTest(GridTestLog4jLogger log) throws Exception {
+ public void executeTest(GridTestLog4jLogger log, GridKernalContext ctx) throws Exception {
counter.incrementAndGet();
- File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false);
-
- final MarshallerContextImpl.ContinuousQueryListener queryListener = new MarshallerContextImpl.ContinuousQueryListener(log, workDir);
-
- final ArrayList evts = new ArrayList<CacheEntryEvent<Integer, String>>();
-
- IgniteCacheProxy cache = new IgniteCacheProxy();
+ MarshallerContextImpl marshallerContext = new MarshallerContextImpl(null);
+ marshallerContext.onMarshallerProcessorStarted(ctx, null);
- evts.add(new CacheContinuousQueryManager.CacheEntryEventImpl(cache, EventType.CREATED, 1, String.class.getName()));
+ MarshallerMappingItem item = new MarshallerMappingItem(JAVA_ID, 1, String.class.getName());
- for (int i = 0; i < 100; i++)
- queryListener.onUpdated(evts);
+ for (int i = 0; i < 400; i++)
+ marshallerContext.onMappingAccepted(item);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index cd8a487..003a330 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import junit.framework.Assert;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
@@ -75,6 +76,7 @@ import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -84,7 +86,10 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -3564,12 +3569,25 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
bCfg.setTypeConfigurations(cfgs);
iCfg.setBinaryConfiguration(bCfg);
+ iCfg.setClientMode(false);
+ iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ //No-op.
+ }
+ });
BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger());
BinaryMarshaller marsh = new BinaryMarshaller();
- marsh.setContext(new MarshallerContextTestImpl(null, excludedClasses));
+ MarshallerContextTestImpl marshCtx = new MarshallerContextTestImpl(null, excludedClasses);
+
+ GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg);
+ kernCtx.add(new GridDiscoveryManager(kernCtx));
+
+ marshCtx.onMarshallerProcessorStarted(kernCtx, null);
+
+ marsh.setContext(marshCtx);
IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", ctx, iCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
index 2b0051e..5ee2423 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
@@ -17,23 +17,22 @@
package org.apache.ignite.internal.binary;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryReader;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.MarshallerContextAdapter;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
/**
*
*/
@@ -88,21 +87,36 @@ public class GridBinaryMarshallerCtxDisabledSelfTest extends GridCommonAbstractT
* Marshaller context with no storage. Platform has to work in such environment as well by marshalling class name of
* a binary object.
*/
- private static class MarshallerContextWithNoStorage extends MarshallerContextAdapter {
- /** */
- public MarshallerContextWithNoStorage() {
- super(null);
+ private static class MarshallerContextWithNoStorage implements MarshallerContext {
+ /** {@inheritDoc} */
+ @Override public boolean registerClassName(
+ byte platformId,
+ int typeId,
+ String clsName
+ ) throws IgniteCheckedException {
+ return false;
}
/** {@inheritDoc} */
- @Override protected boolean registerClassName(int id, String clsName) throws IgniteCheckedException {
- return false;
+ @Override public Class getClass(
+ int typeId,
+ ClassLoader ldr
+ ) throws ClassNotFoundException, IgniteCheckedException {
+ return null;
}
/** {@inheritDoc} */
- @Override protected String className(int id) throws IgniteCheckedException {
+ @Override public String getClassName(
+ byte platformId,
+ int typeId
+ ) throws ClassNotFoundException, IgniteCheckedException {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isSystemType(String typeName) {
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
index 9e3ea7b..fd2dd80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
@@ -137,11 +137,15 @@ public class GridCacheEntryMemorySizeSelfTest extends GridCommonAbstractTest {
Marshaller marsh = new OptimizedMarshaller();
marsh.setContext(new MarshallerContext() {
- @Override public boolean registerClass(int id, Class cls) {
+ @Override public boolean registerClassName(byte platformId, int typeId, String clsName) {
return true;
}
- @Override public Class getClass(int id, ClassLoader ldr) {
+ @Override public Class getClass(int typeId, ClassLoader ldr) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public String getClassName(byte platformId, int typeId) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
index ea7b124..8255dc8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteInternalCacheTypesTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
@@ -113,14 +112,6 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest {
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
- ignite.cache(CU.MARSH_CACHE_NAME);
-
- return null;
- }
- }, IllegalStateException.class, null);
-
- GridTestUtils.assertThrows(log(), new Callable<Object>() {
- @Override public Object call() throws Exception {
ignite.cache(CU.ATOMICS_CACHE_NAME);
return null;
@@ -129,8 +120,6 @@ public class IgniteInternalCacheTypesTest extends GridCommonAbstractTest {
checkCache(ignite, CU.UTILITY_CACHE_NAME, UTILITY_CACHE_POOL, false, true);
- checkCache(ignite, CU.MARSH_CACHE_NAME, MARSH_CACHE_POOL, false, false);
-
checkCache(ignite, CU.ATOMICS_CACHE_NAME, SYSTEM_POOL, false, true);
for (String cache : userCaches)
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
new file mode 100644
index 0000000..50107e4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests situation when two nodes in cluster simultaneously propose different classes with the same typeId
+ * (which is actually class name's <b>hashCode</b> ).
+ *
+ * In that case one of the propose requests should be rejected
+ * and {@link org.apache.ignite.internal.processors.marshaller.MappingProposedMessage} is sent
+ * with not-null <b>conflictingClsName</b> field.
+ */
+public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private volatile boolean bbClsRejected;
+
+ /** */
+ private volatile boolean aaClsRejected;
+
+ /** */
+ private volatile boolean rejectObserved;
+
+ /**
+ * Latch used to synchronize two nodes on sending mapping requests for classes with conflicting names.
+ */
+ private static final CountDownLatch startLatch = new CountDownLatch(3);
+
+ /** */
+ private static volatile boolean busySpinFlag;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setRebalanceMode(SYNC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCachePutGetClassesWithNameConflict() throws Exception {
+ Ignite srv1 = startGrid(0);
+ Ignite srv2 = startGrid(1);
+ ExecutorService exec1 = srv1.executorService();
+ ExecutorService exec2 = srv2.executorService();
+
+ final AtomicInteger trickCompilerVar = new AtomicInteger(1);
+
+ final Organization aOrg1 = new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA");
+ final OrganizatioN bOrg2 = new OrganizatioN(2, "Apple", "1 Infinite Loop, Cupertino, CA 95014, USA");
+
+ exec1.submit(new Runnable() {
+ @Override public void run() {
+ startLatch.countDown();
+
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //busy spinning after waking up from startLatch.await
+ // to reduce probability that one thread starts significantly earlier than the other
+ while (!busySpinFlag) {
+ if (trickCompilerVar.get() < 0)
+ break;
+ }
+
+ Ignition.localIgnite().cache(null).put(1, aOrg1);
+ }
+ });
+
+ exec2.submit(new Runnable() {
+ @Override public void run() {
+ startLatch.countDown();
+
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //busy spinning after waking up from startLatch.await
+ // to reduce probability that one thread starts significantly earlier than the other
+ while (!busySpinFlag) {
+ if (trickCompilerVar.get() < 0)
+ break;
+ }
+
+ Ignition.localIgnite().cache(null).put(2, bOrg2);
+ }
+ });
+ startLatch.countDown();
+
+ busySpinFlag = true;
+
+ exec1.shutdown();
+ exec2.shutdown();
+
+ exec1.awaitTermination(100, TimeUnit.MILLISECONDS);
+ exec2.awaitTermination(100, TimeUnit.MILLISECONDS);
+
+ Ignite ignite = startGrid(2);
+
+ int cacheSize = ignite.cache(null).size(CachePeekMode.PRIMARY);
+
+ assertTrue("Expected cache size 1 but was " + cacheSize, cacheSize == 1);
+
+ if (rejectObserved)
+ assertTrue(aaClsRejected || bbClsRejected);
+ }
+
+ /** */
+ private class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+
+ /** */
+ private class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
+ /** */
+ private DiscoverySpiListener delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscovery(
+ int type,
+ long topVer,
+ ClusterNode node,
+ Collection<ClusterNode> topSnapshot,
+ @Nullable Map<Long, Collection<ClusterNode>> topHist,
+ @Nullable DiscoverySpiCustomMessage spiCustomMsg
+ ) {
+ DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
+ : (DiscoveryCustomMessage) U.field(spiCustomMsg, "delegate");
+
+ if (customMsg != null)
+ //don't want to make this class public, using equality of class name instead of instanceof operator
+ if ("MappingProposedMessage".equals(customMsg.getClass().getSimpleName())) {
+ String conflClsName = U.field(customMsg, "conflictingClsName");
+ if (conflClsName != null && !conflClsName.isEmpty()) {
+ rejectObserved = true;
+ if (conflClsName.contains("Organization"))
+ bbClsRejected = true;
+ else if (conflClsName.contains("OrganizatioN"))
+ aaClsRejected = true;
+ }
+ }
+
+ delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(new DiscoverySpiListenerWrapper(lsnr));
+ }
+ }
+
+ /**
+ * Class name is chosen to be in conflict with other class name this test put to cache.
+ */
+ private static class Organization {
+ /** */
+ private final int id;
+
+ /** */
+ private final String name;
+
+ /** */
+ private final String addr;
+
+ /**
+ * @param id Id.
+ * @param name Name.
+ * @param addr Address.
+ */
+ Organization(int id, String name, String addr) {
+ this.id = id;
+ this.name = name;
+ this.addr = addr;
+ }
+ }
+
+ /**
+ * Class name is chosen to be in conflict with other class name this test put to cache.
+ */
+ private static class OrganizatioN {
+ /** */
+ private final int id;
+
+ /** */
+ private final String name;
+
+ /** */
+ private final String addr;
+
+ /**
+ * @param id Id.
+ * @param name Name.
+ * @param addr Address.
+ */
+ OrganizatioN(int id, String name, String addr) {
+ this.id = id;
+ this.name = name;
+ this.addr = addr;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
new file mode 100644
index 0000000..f1c1b3a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests for client requesting missing mappings from server nodes with and without server nodes failures.
+ */
+public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCommonAbstractTest {
+ /**
+ * Need to point client node to a different working directory
+ * to avoid reading marshaller mapping from FS and to force sending MissingMappingRequest.
+ */
+ private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
+
+ /** */
+ private static final AtomicInteger mappingReqsCounter = new AtomicInteger(0);
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean clientMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(clientMode);
+
+ if (clientMode)
+ cfg.setWorkDirectory(TMP_DIR);
+
+ TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(REPLICATED);
+ ccfg.setRebalanceMode(SYNC);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanupMarshallerFileStore();
+
+ mappingReqsCounter.set(0);
+ }
+
+ /**
+ *
+ */
+ private void cleanupMarshallerFileStore() throws IOException {
+ Path marshCache = Paths.get(TMP_DIR, "marshaller");
+
+ for (File file : marshCache.toFile().listFiles())
+ Files.delete(file.toPath());
+
+ Files.deleteIfExists(marshCache);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRequestedMappingIsStoredInFS() throws Exception {
+ Ignite srv1 = startGrid(0);
+
+ Organization org = new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA");
+
+ srv1.cache(null).put(1, org);
+
+ clientMode = true;
+
+ Ignite cl1 = startGrid(1);
+
+ cl1.cache(null).get(1);
+
+ String clsName = Organization.class.getName();
+
+ stopGrid(1);
+
+ if (!getMarshCtxFileStoreExecutorSrvc((GridKernalContext) U.field(cl1, "ctx"))
+ .awaitTermination(5000, TimeUnit.MILLISECONDS))
+ fail("Failed to wait for executor service used by MarshallerContext to shutdown");
+
+ File[] files = Paths.get(TMP_DIR, "marshaller").toFile().listFiles();
+
+ assertNotNull(TMP_DIR + "/marshaller directory should contain at least one file", files);
+ assertEquals(TMP_DIR + "/marshaller directory should contain exactly one file", 1, files.length);
+ assertEquals(clsName, new String(Files.readAllBytes(files[0].toPath())));
+ }
+
+ /**
+ * @param ctx Context.
+ */
+ private ExecutorService getMarshCtxFileStoreExecutorSrvc(GridKernalContext ctx) {
+ return U.field(U.field(ctx, "marshCtx"), "execSrvc");
+ }
+
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoNodesDieOnRequest() throws Exception {
+ Ignite srv1 = startGrid(0);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io());
+
+ Ignite srv2 = startGrid(1);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io());
+
+ Ignite srv3 = startGrid(2);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv3, "ctx")).io());
+
+ srv3.cache(null).put(1, new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA"));
+
+ clientMode = true;
+
+ Ignite cl1 = startGrid(4);
+ cl1.cache(null).get(1);
+
+ assertEquals("Expected requests count is 1, actual is " + mappingReqsCounter.get(),
+ 1,
+ mappingReqsCounter.get());
+ }
+
+ /**
+ *
+ */
+ public void testOneNodeDiesOnRequest() throws Exception {
+ Ignite srv1 = startGrid(0);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io(), 0);
+
+ Ignite srv2 = startGrid(1);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io());
+
+ Ignite srv3 = startGrid(2);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv3, "ctx")).io());
+
+ srv3.cache(null).put(1, new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA"));
+
+ clientMode = true;
+
+ Ignite cl1 = startGrid(4);
+ cl1.cache(null).get(1);
+
+ assertEquals("Expected requests count is 2, actual is " + mappingReqsCounter.get(),
+ 2,
+ mappingReqsCounter.get());
+ }
+
+ /**
+ *
+ */
+ public void testTwoNodesDieOnRequest() throws Exception {
+ Ignite srv1 = startGrid(0);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io(), 0);
+
+ Ignite srv2 = startGrid(1);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io(), 1);
+
+ Ignite srv3 = startGrid(2);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv3, "ctx")).io());
+
+ srv3.cache(null).put(1, new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA"));
+
+ clientMode = true;
+
+ Ignite cl1 = startGrid(4);
+ cl1.cache(null).get(1);
+
+ assertEquals("Expected requests count is 3, actual is " + mappingReqsCounter.get(),
+ 3,
+ mappingReqsCounter.get());
+ }
+
+ /**
+ *
+ */
+ public void testAllNodesDieOnRequest() throws Exception {
+ Ignite srv1 = startGrid(0);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io(), 0);
+
+ Ignite srv2 = startGrid(1);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io(), 1);
+
+ Ignite srv3 = startGrid(2);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv3, "ctx")).io(), 2);
+
+ srv3.cache(null).put(1, new Organization(1, "Microsoft", "One Microsoft Way Redmond, WA 98052-6399, USA"));
+
+ clientMode = true;
+
+ Ignite cl1 = startGrid(4);
+ try {
+ cl1.cache(null).get(1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("Expected requests count is 3, actual is " + mappingReqsCounter.get(),
+ 3,
+ mappingReqsCounter.get());
+ }
+
+ /**
+ *
+ */
+ private void replaceWithCountingMappingRequestListener(GridIoManager ioMgr) {
+ GridMessageListener[] lsnrs = U.field(ioMgr, "sysLsnrs");
+
+ final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()];
+
+ GridMessageListener wrapper = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ mappingReqsCounter.incrementAndGet();
+ delegate.onMessage(nodeId, msg);
+ }
+ };
+
+ lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()] = wrapper;
+ }
+
+ /**
+ *
+ */
+ private void replaceWithStoppingMappingRequestListener(GridIoManager ioMgr, final int nodeIdToStop) {
+ ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH);
+
+ ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ new Thread(new Runnable() {
+ @Override public void run() {
+ mappingReqsCounter.incrementAndGet();
+ stopGrid(nodeIdToStop, true);
+ }
+ }).start();
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ private static class Organization {
+ /** */
+ private final int id;
+
+ /** */
+ private final String name;
+
+ /** */
+ private final String addr;
+
+ /**
+ * @param id Id.
+ * @param name Name.
+ * @param addr Address.
+ */
+ Organization(int id, String name, String addr) {
+ this.id = id;
+ this.name = name;
+ this.addr = addr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "Organization{" +
+ "id=" + id +
+ ", name='" + name + '\'' +
+ ", addr='" + addr + '\'' +
+ '}';
+ }
+ }
+
+ /**
+ * This implementation prevents client nodes from obtaining marshaller mapping data on discovery phase.
+ *
+ * It is needed to force client to request mapping from grid.
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) {
+ if (locNode.isClient()) {
+ Map<Integer, byte[]> cmnData = U.field(dataPacket, "commonData");
+
+ cmnData.remove(GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC.ordinal());
+ }
+
+ super.onExchange(dataPacket, clsLdr);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
index 66356cb..8545f82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteSystemCacheOnClientTest.java
@@ -65,30 +65,9 @@ public class IgniteSystemCacheOnClientTest extends GridCommonAbstractTest {
assertTrue(ignite.configuration().isClientMode());
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return ignite.internalCache(CU.MARSH_CACHE_NAME) != null;
- }
- }, 5000);
-
- GridCacheAdapter marshCache = ignite.internalCache(CU.MARSH_CACHE_NAME);
-
- assertNotNull(marshCache);
-
- assertFalse(marshCache.context().isNear());
-
- marshCache = ((IgniteKernal)ignite(0)).internalCache(CU.MARSH_CACHE_NAME);
-
- assertFalse(marshCache.context().isNear());
-
- Collection<ClusterNode> affNodes = marshCache.affinity().mapKeyToPrimaryAndBackups(1);
-
- assertEquals(1, affNodes.size());
- assertTrue(affNodes.contains(ignite(0).cluster().localNode()));
-
GridCacheAdapter utilityCache = ((IgniteKernal)ignite(0)).internalCache(CU.UTILITY_CACHE_NAME);
- affNodes = utilityCache.affinity().mapKeyToPrimaryAndBackups(1);
+ Collection<ClusterNode> affNodes = utilityCache.affinity().mapKeyToPrimaryAndBackups(1);
assertEquals(1, affNodes.size());
assertTrue(affNodes.contains(ignite(0).cluster().localNode()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
index c7a6a55..3571d08 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java
@@ -18,15 +18,20 @@
package org.apache.ignite.internal.processors.cache.binary;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.cache.GridCacheEntryMemorySizeSelfTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
/**
*
@@ -36,9 +41,21 @@ public class GridBinaryCacheEntryMemorySizeSelfTest extends GridCacheEntryMemory
@Override protected Marshaller createMarshaller() throws IgniteCheckedException {
BinaryMarshaller marsh = new BinaryMarshaller();
- marsh.setContext(new MarshallerContextTestImpl(null));
-
IgniteConfiguration iCfg = new IgniteConfiguration();
+ iCfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ // No-op.
+ }
+ });
+ iCfg.setClientMode(false);
+
+ GridTestKernalContext kernCtx = new GridTestKernalContext(log, iCfg);
+ kernCtx.add(new GridDiscoveryManager(kernCtx));
+
+ MarshallerContextTestImpl marshCtx = new MarshallerContextTestImpl(null);
+ marshCtx.onMarshallerProcessorStarted(kernCtx, null);
+
+ marsh.setContext(marshCtx);
BinaryContext pCtx = new BinaryContext(BinaryNoopMetadataHandler.instance(), iCfg, new NullLogger());
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
index 489a402..09804c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -59,7 +59,7 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
- for (String cacheName : new String[] {null, CU.UTILITY_CACHE_NAME, CU.MARSH_CACHE_NAME}) {
+ for (String cacheName : new String[] {null, CU.UTILITY_CACHE_NAME}) {
IgniteKernal kernal = (IgniteKernal)ignite(0);
GridCacheAdapter<Object, Object> cache = kernal.context().cache().internalCache(cacheName);
@@ -108,22 +108,6 @@ public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelf
/**
* @throws Exception If failed.
*/
- public void testMarshallerCacheShouldNotStartTx() throws Exception {
- IgniteKernal ignite = (IgniteKernal)grid(0);
-
- final GridCacheAdapter<String,String> marshallerCache = (GridCacheAdapter<String, String>)(GridCacheAdapter)
- ignite.context().cache().marshallerCache();
-
- GridTestUtils.assertThrows(log, new Callable<Object>() {
- @Override public Object call() throws Exception {
- return marshallerCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
- }
- }, IgniteException.class, null);
- }
-
- /**
- * @throws Exception If failed.
- */
private void checkTransactionsCommitted() throws Exception {
for (int i = 0; i < gridCount(); i++) {
IgniteKernal kernal = (IgniteKernal)grid(i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index 34b67bc..7fed37d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -159,12 +159,12 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
final int size = 10;
- IgniteCache<Object, Object> chache0 = grid(0).cache(null);
+ IgniteCache<Object, Object> cache0 = grid(0).cache(null);
for (int i = 0; i < size; i++) {
info("Putting value [i=" + i + ']');
- chache0.put(i, i);
+ cache0.put(i, i);
info("Finished putting value [i=" + i + ']');
}
@@ -175,13 +175,13 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti
for (int i = 0; i < size; i++) {
info("Putting value 2 [i=" + i + ']');
- assertEquals(i, chache0.getAndPutIfAbsent(i, i * i));
+ assertEquals(i, cache0.getAndPutIfAbsent(i, i * i));
info("Finished putting value 2 [i=" + i + ']');
}
for (int i = 0; i < size; i++)
- assertEquals(i, chache0.get(i));
+ assertEquals(i, cache0.get(i));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index d5dc557..0e08b01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -211,7 +211,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
for (int i = 0; i < gridCount(); i++) {
GridContinuousProcessor proc = grid(i).context().continuous();
- assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
+ assertEquals(String.valueOf(i), 1, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextSelfTest.java
index 8a0ff9a..7ce6ece 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextSelfTest.java
@@ -18,46 +18,57 @@
package org.apache.ignite.marshaller;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import javax.cache.event.EventType;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.MarshallerContextImpl;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static java.nio.file.Files.readAllBytes;
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
/**
* Test marshaller context.
*/
public class MarshallerContextSelfTest extends GridCommonAbstractTest {
+ /** */
+ private GridTestKernalContext ctx;
+
+ /** */
+ private ExecutorService execSvc;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ ctx = newContext();
+ execSvc = Executors.newSingleThreadExecutor();
+
+ ctx.setSystemExecutorService(execSvc);
+ }
+
/**
* @throws Exception If failed.
*/
public void testClassName() throws Exception {
- File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false);
+ MarshallerContextImpl ctx = new MarshallerContextImpl(null);
- final MarshallerContextImpl.ContinuousQueryListener queryListener =
- new MarshallerContextImpl.ContinuousQueryListener(log, workDir);
+ ctx.onMarshallerProcessorStarted(this.ctx, null);
- final ArrayList evts = new ArrayList<>();
+ MarshallerMappingItem item = new MarshallerMappingItem(JAVA_ID, 1, String.class.getName());
- IgniteCacheProxy cache = new IgniteCacheProxy();
-
- evts.add(new CacheContinuousQueryManager.CacheEntryEventImpl(cache,
- EventType.CREATED,
- 1,
- String.class.getName()));
-
- queryListener.onUpdated(evts);
+ ctx.onMappingAccepted(item);
try (Ignite g1 = startGrid(1)) {
MarshallerContextImpl marshCtx = ((IgniteKernal)g1).context().marshallerContext();
- String clsName = marshCtx.className(1);
+ String clsName = marshCtx.getClassName(JAVA_ID, 1);
assertEquals("java.lang.String", clsName);
}
@@ -68,23 +79,100 @@ public class MarshallerContextSelfTest extends GridCommonAbstractTest {
*/
public void testOnUpdated() throws Exception {
File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false);
+ MarshallerContextImpl ctx = new MarshallerContextImpl(null);
+
+ ctx.onMarshallerProcessorStarted(this.ctx, null);
+
+ MarshallerMappingItem item1 = new MarshallerMappingItem(JAVA_ID, 1, String.class.getName());
+
+ ctx.onMappingAccepted(item1);
+
+ checkFileName("java.lang.String", Paths.get(workDir + "/1.classname0"));
+
+ MarshallerMappingItem item2 = new MarshallerMappingItem((byte) 2, 2, "Random.Class.Name");
+
+ ctx.onMappingProposed(item2);
+ ctx.onMappingAccepted(item2);
+
+ execSvc.shutdown();
+ if (execSvc.awaitTermination(1000, TimeUnit.MILLISECONDS))
+ checkFileName("Random.Class.Name", Paths.get(workDir + "/2.classname2"));
+ else
+ fail("Failed to wait for executor service to shutdown");
+ }
+
+ /**
+ * Tests that there is a null value inserted in allCaches list
+ * if platform ids passed to marshaller cache were not sequential (like 0, 2).
+ */
+ public void testCacheStructure0() throws Exception {
+ MarshallerContextImpl ctx = new MarshallerContextImpl(null);
+
+ ctx.onMarshallerProcessorStarted(this.ctx, null);
+
+ MarshallerMappingItem item1 = new MarshallerMappingItem(JAVA_ID, 1, String.class.getName());
- final MarshallerContextImpl.ContinuousQueryListener queryListener =
- new MarshallerContextImpl.ContinuousQueryListener(log, workDir);
+ ctx.onMappingAccepted(item1);
- final ArrayList evts = new ArrayList<>();
+ MarshallerMappingItem item2 = new MarshallerMappingItem((byte) 2, 2, "Random.Class.Name");
- IgniteCacheProxy cache = new IgniteCacheProxy();
+ ctx.onMappingProposed(item2);
- evts.add(new CacheContinuousQueryManager.CacheEntryEventImpl(cache,
- EventType.CREATED,
- 1,
- String.class.getName()));
+ List list = U.field(ctx, "allCaches");
- queryListener.onUpdated(evts);
+ assertNotNull("Mapping cache is null for platformId: 0" , list.get(0));
+ assertNull("Mapping cache is not null for platformId: 1", list.get(1));
+ assertNotNull("Mapping cache is null for platformId: 2", list.get(2));
- String fileName = "1.classname";
+ boolean excObserved = false;
+ try {
+ list.get(3);
+ }
+ catch (ArrayIndexOutOfBoundsException ignored) {
+ excObserved = true;
+ }
+ assertTrue("ArrayIndexOutOfBoundsException had to be thrown", excObserved);
+ }
+
+ /**
+ * Tests that there are no null values in allCaches list
+ * if platform ids passed to marshaller context were sequential.
+ */
+ public void testCacheStructure1() throws Exception {
+ MarshallerContextImpl ctx = new MarshallerContextImpl(null);
+
+ ctx.onMarshallerProcessorStarted(this.ctx, null);
+
+ MarshallerMappingItem item1 = new MarshallerMappingItem(JAVA_ID, 1, String.class.getName());
+
+ ctx.onMappingAccepted(item1);
+
+ MarshallerMappingItem item2 = new MarshallerMappingItem((byte) 1, 2, "Random.Class.Name");
+
+ ctx.onMappingProposed(item2);
- assertEquals("java.lang.String", new String(readAllBytes(Paths.get(workDir + "/" + fileName))));
+ List list = U.field(ctx, "allCaches");
+
+ assertNotNull("Mapping cache is null for platformId: 0" , list.get(0));
+ assertNotNull("Mapping cache is null for platformId: 1", list.get(1));
+
+ boolean excObserved = false;
+
+ try {
+ list.get(2);
+ }
+ catch (ArrayIndexOutOfBoundsException ignored) {
+ excObserved = true;
+ }
+
+ assertTrue("ArrayIndexOutOfBoundsException had to be thrown", excObserved);
+ }
+
+ /**
+ * @param expected Expected.
+ * @param pathToReal Path to real.
+ */
+ private void checkFileName(String expected, Path pathToReal) throws IOException {
+ assertEquals(expected, new String(readAllBytes(pathToReal)));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
index 9ff127d..6085332 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java
@@ -21,7 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.MarshallerContextAdapter;
+import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -29,7 +29,7 @@ import org.jsr166.ConcurrentHashMap8;
/**
* Test marshaller context.
*/
-public class MarshallerContextTestImpl extends MarshallerContextAdapter {
+public class MarshallerContextTestImpl extends MarshallerContextImpl {
/** */
private static final ConcurrentMap<Integer, String> map = new ConcurrentHashMap8<>();
@@ -64,29 +64,33 @@ public class MarshallerContextTestImpl extends MarshallerContextAdapter {
this(null);
}
+ /**
+ * @return Internal map.
+ */
+ public ConcurrentMap<Integer, String> internalMap() {
+ return map;
+ }
+
/** {@inheritDoc} */
- @Override protected boolean registerClassName(int id, String clsName) throws IgniteCheckedException {
+ @Override public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException {
if (excluded != null && excluded.contains(clsName))
return false;
- String oldClsName = map.putIfAbsent(id, clsName);
+ String oldClsName = map.putIfAbsent(typeId, clsName);
if (oldClsName != null && !oldClsName.equals(clsName))
- throw new IgniteCheckedException("Duplicate ID [id=" + id + ", oldClsName=" + oldClsName + ", clsName=" +
- clsName + ']');
+ throw new IgniteCheckedException("Duplicate ID [id=" + typeId + ", oldClsName=" + oldClsName + ", clsName=" +
+ clsName + ']');
return true;
}
/** {@inheritDoc} */
- @Override protected String className(int id) {
- return map.get(id);
- }
-
- /**
- * @return Internal map.
- */
- public ConcurrentMap<Integer, String> internalMap() {
- return map;
+ @Override public String getClassName(
+ byte platformId,
+ int typeId
+ ) throws ClassNotFoundException, IgniteCheckedException {
+ String clsName = map.get(typeId);
+ return (clsName == null) ? super.getClassName(platformId, typeId) : clsName;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerEnumSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerEnumSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerEnumSelfTest.java
index e06bed9..c072170 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerEnumSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerEnumSelfTest.java
@@ -18,19 +18,33 @@
package org.apache.ignite.marshaller.optimized;
import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
/**
*
*/
public class OptimizedMarshallerEnumSelfTest extends TestCase {
+
+ private String igniteHome = System.getProperty("user.dir");
+
+ private final IgniteLogger rootLog = new GridTestLog4jLogger(false);
/**
* @throws Exception If failed.
*/
public void testEnumSerialisation() throws Exception {
OptimizedMarshaller marsh = new OptimizedMarshaller();
- marsh.setContext(new MarshallerContextTestImpl());
+ MarshallerContextTestImpl context = new MarshallerContextTestImpl();
+
+ context.onMarshallerProcessorStarted(newContext(), null);
+
+ marsh.setContext(context);
byte[] bytes = marsh.marshal(TestEnum.Bond);
@@ -40,6 +54,15 @@ public class OptimizedMarshallerEnumSelfTest extends TestCase {
assertEquals(TestEnum.Bond.desc, unmarshalled.desc);
}
+ private GridKernalContext newContext() throws IgniteCheckedException {
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setIgniteHome(igniteHome);
+ cfg.setClientMode(false);
+
+ return new GridTestKernalContext(rootLog.getLogger(OptimizedMarshallerEnumSelfTest.class), cfg);
+ }
+
private enum TestEnum {
Equity("Equity") {
@Override public String getTestString() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 1341e25..137eda2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -404,11 +403,11 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
});
spi.setDataExchange(new DiscoverySpiDataExchange() {
- @Override public Map<Integer, Serializable> collect(UUID nodeId) {
- return new HashMap<>();
+ @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+ return dataBag;
}
- @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
+ @Override public void onExchange(DiscoveryDataBag dataBag) {
// No-op.
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 043208c..1ec8ee8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -68,7 +68,10 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
@@ -94,6 +97,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_TASK_FAILED;
import static org.apache.ignite.events.EventType.EVT_TASK_FINISHED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC;
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
import static org.apache.ignite.spi.IgnitePortProtocol.UDP;
/**
@@ -1972,6 +1977,39 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * Test verifies Ignite nodes don't exchange system types on discovery phase but only user types.
+ */
+ public void testSystemMarshallerTypesFilteredOut() throws Exception {
+ try {
+ nodeSpi.set(new TestTcpDiscoveryMarshallerDataSpi());
+
+ Ignite srv1 = startGrid(0);
+
+ IgniteCache<Object, Object> organizations = srv1.createCache("organizations");
+
+ organizations.put(1, new Organization());
+
+ startGrid(1);
+
+ assertEquals("Expected items in marshaller discovery data: 1, actual: "
+ + TestTcpDiscoveryMarshallerDataSpi.marshalledItems,
+ 1, TestTcpDiscoveryMarshallerDataSpi.marshalledItems);
+
+ IgniteCache<Object, Object> employees = srv1.createCache("employees");
+
+ employees.put(1, new Employee());
+
+ startGrid(2);
+
+ assertEquals("Expected items in marshaller discovery data: 2, actual: "
+ + TestTcpDiscoveryMarshallerDataSpi.marshalledItems,
+ 2, TestTcpDiscoveryMarshallerDataSpi.marshalledItems);
+ } finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testDuplicatedDiscoveryDataRemoved() throws Exception {
@@ -2084,6 +2122,51 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
/**
+ * SPI used in {@link #testSystemMarshallerTypesFilteredOut()} test to check that only
+ * user types get to discovery messages on joining new nodes.
+ */
+ private static class TestTcpDiscoveryMarshallerDataSpi extends TcpDiscoverySpi {
+ /** Marshalled items. */
+ static volatile int marshalledItems;
+
+ /** {@inheritDoc} */
+ @Override public TcpDiscoverySpi setDataExchange(final DiscoverySpiDataExchange exchange) {
+ return super.setDataExchange(new DiscoverySpiDataExchange() {
+ @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+ DiscoveryDataBag bag = exchange.collect(dataBag);
+
+ if (bag.commonData().containsKey(MARSHALLER_PROC.ordinal()))
+ marshalledItems = getJavaMappings(getAllMappings(dataBag)).size();
+
+ return bag;
+ }
+
+ @Override public void onExchange(DiscoveryDataBag dataBag) {
+ exchange.onExchange(dataBag);
+ }
+
+ private List getAllMappings(DiscoveryDataBag bag) {
+ return (List) bag.commonData().get(MARSHALLER_PROC.ordinal());
+ }
+
+ private Map getJavaMappings(List allMappings) {
+ return (Map) allMappings.get(JAVA_ID);
+ }
+ });
+ }
+ }
+
+ /**
+ * User class used in {@link #testSystemMarshallerTypesFilteredOut()} test to feed into marshaller cache.
+ */
+ private static class Organization { }
+
+ /**
+ * User class used in {@link #testSystemMarshallerTypesFilteredOut()} test to feed into marshaller cache.
+ */
+ private static class Employee { }
+
+ /**
*
*/
private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi {
@@ -2101,14 +2184,22 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryNodeAddedMessage) {
- Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddedMessage)msg).oldNodesDiscoveryData();
+ DiscoveryDataPacket dataPacket = ((TcpDiscoveryNodeAddedMessage)msg).gridDiscoveryData();
- checkDiscoData(discoData, msg);
+ if (dataPacket != null) {
+ Map<UUID, Map<Integer, byte[]>> discoData = U.field(dataPacket, "nodeSpecificData");
+
+ checkDiscoData(discoData, msg);
+ }
}
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
- Map<UUID, Map<Integer, byte[]>> discoData = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData();
+ DiscoveryDataPacket dataPacket = ((TcpDiscoveryNodeAddFinishedMessage)msg).clientDiscoData();
+
+ if (dataPacket != null) {
+ Map<UUID, Map<Integer, byte[]>> discoData = U.field(dataPacket, "nodeSpecificData");
- checkDiscoData(discoData, msg);
+ checkDiscoData(discoData, msg);
+ }
}
super.writeToSocket(sock, out, msg, timeout);
@@ -2122,9 +2213,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
if (discoData != null && discoData.size() > 1) {
int cnt = 0;
- for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
- Map<Integer, byte[]> map = e.getValue();
-
+ for (Map<Integer, byte[]> map : discoData.values()) {
if (map.containsKey(GridComponent.DiscoveryDataExchangeType.CACHE_PROC.ordinal()))
cnt++;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
index 2a77e76..1bcc1cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiStartStopSelfTest.java
@@ -17,11 +17,8 @@
package org.apache.ignite.spi.discovery.tcp;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
import org.apache.ignite.spi.GridSpiStartStopAbstractTest;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -47,11 +44,11 @@ public class TcpDiscoverySpiStartStopSelfTest extends GridSpiStartStopAbstractTe
@GridSpiTestConfig
public DiscoverySpiDataExchange getDataExchange() {
return new DiscoverySpiDataExchange() {
- @Override public Map<Integer, Serializable> collect(UUID nodeId) {
- return Collections.emptyMap();
+ @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+ return dataBag;
}
- @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
+ @Override public void onExchange(DiscoveryDataBag dataBag) {
// No-op.
}
};
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 30c7244..f00298f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -79,6 +79,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -460,7 +461,17 @@ public abstract class GridAbstractTest extends TestCase {
* @return Test kernal context.
*/
protected GridTestKernalContext newContext() throws IgniteCheckedException {
- return new GridTestKernalContext(log());
+ IgniteConfiguration cfg = new IgniteConfiguration();
+
+ cfg.setClientMode(false);
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+ //No-op
+ }
+ });
+
+ GridTestKernalContext ctx = new GridTestKernalContext(log(), cfg);
+ return ctx;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index 40f0e43..db45e27 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -64,7 +64,6 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
- null,
U.allPluginProviders()
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 20b3cf2..c74117c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
@@ -220,11 +221,13 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
discoSpi.setMetricsProvider(createMetricsProvider());
discoSpi.setDataExchange(new DiscoverySpiDataExchange() {
- @Override public Map<Integer, Serializable> collect(UUID nodeId) {
- return new HashMap<>();
+
+ @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) {
+ return dataBag;
}
- @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
+ @Override public void onExchange(DiscoveryDataBag dataBag) {
+ // No-op.
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9e20d2a..d7a4a69 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -46,8 +46,8 @@ import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTes
import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessorValidationSelfTest;
import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
+import org.apache.ignite.internal.processors.odbc.OdbcProcessorValidationSelfTest;
import org.apache.ignite.internal.processors.service.ClosureServiceClientsNodesTest;
import org.apache.ignite.internal.product.GridProductVersionSelfTest;
import org.apache.ignite.internal.util.nio.IgniteExceptionInNioWorkerSelfTest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
index 2e3d8b5..438132c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
@@ -30,6 +30,8 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingNe
import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingRebalanceErrorTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingTxErrorTest;
import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest;
+import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest;
+import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest;
import org.apache.ignite.internal.util.GridHandleTableSelfTest;
import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
import org.apache.ignite.internal.util.io.GridUnsafeDataOutputArraySizingSelfTest;
@@ -94,6 +96,9 @@ public class IgniteBinaryBasicTestSuite extends TestSuite {
suite.addTest(IgniteBasicTestSuite.suite(ignoredTests));
+ suite.addTestSuite(IgniteMarshallerCacheClassNameConflictTest.class);
+ suite.addTestSuite(IgniteMarshallerCacheClientRequestsMappingOnMissTest.class);
+
return suite;
}
}