You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/02 18:22:02 UTC
[14/29] ignite git commit: IGNITE-950 - Renaming.
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridIgniteObjectMarshalerAwareTestClass.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridIgniteObjectMarshalerAwareTestClass.java b/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridIgniteObjectMarshalerAwareTestClass.java
deleted file mode 100644
index 9fe4242..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridIgniteObjectMarshalerAwareTestClass.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.mutabletest;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.igniteobject.IgniteObjectException;
-import org.apache.ignite.igniteobject.IgniteObjectMarshalAware;
-import org.apache.ignite.igniteobject.IgniteObjectRawReader;
-import org.apache.ignite.igniteobject.IgniteObjectRawWriter;
-import org.apache.ignite.igniteobject.IgniteObjectReader;
-import org.apache.ignite.igniteobject.IgniteObjectWriter;
-import org.apache.ignite.testframework.GridTestUtils;
-
-/**
- *
- */
-public class GridIgniteObjectMarshalerAwareTestClass implements IgniteObjectMarshalAware {
- /** */
- public String s;
-
- /** */
- public String sRaw;
-
- /** {@inheritDoc} */
- @Override public void writePortable(IgniteObjectWriter writer) throws IgniteObjectException {
- writer.writeString("s", s);
-
- IgniteObjectRawWriter raw = writer.rawWriter();
-
- raw.writeString(sRaw);
- }
-
- /** {@inheritDoc} */
- @Override public void readPortable(IgniteObjectReader reader) throws IgniteObjectException {
- s = reader.readString("s");
-
- IgniteObjectRawReader raw = reader.rawReader();
-
- sRaw = raw.readString();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("FloatingPointEquality")
- @Override public boolean equals(Object other) {
- return this == other || GridTestUtils.deepEquals(this, other);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridIgniteObjectMarshalerAwareTestClass.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridPortableTestClasses.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridPortableTestClasses.java b/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridPortableTestClasses.java
index af9be0b..69687ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridPortableTestClasses.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/portable/mutabletest/GridPortableTestClasses.java
@@ -32,7 +32,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.ignite.internal.util.lang.GridMapEntry;
-import org.apache.ignite.igniteobject.IgniteObject;
+import org.apache.ignite.binary.BinaryObject;
/**
*
@@ -106,7 +106,7 @@ public class GridPortableTestClasses {
*/
public static class TestObjectPlainPortable {
/** */
- public IgniteObject plainPortable;
+ public BinaryObject plainPortable;
/**
*
@@ -118,7 +118,7 @@ public class GridPortableTestClasses {
/**
* @param plainPortable Object.
*/
- public TestObjectPlainPortable(IgniteObject plainPortable) {
+ public TestObjectPlainPortable(BinaryObject plainPortable) {
this.plainPortable = plainPortable;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractDataStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractDataStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractDataStreamerSelfTest.java
new file mode 100644
index 0000000..c1144b6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractDataStreamerSelfTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.LongAdder8;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ * Test for portable objects stored in cache.
+ */
+public abstract class GridCacheBinaryObjectsAbstractDataStreamerSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int THREAD_CNT = 64;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration();
+
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setAtomicityMode(atomicityMode());
+ cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setWriteSynchronizationMode(writeSynchronizationMode());
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ PortableMarshaller marsh = new PortableMarshaller();
+
+ marsh.setTypeConfigurations(Arrays.asList(
+ new BinaryTypeConfiguration(TestObject.class.getName())));
+
+ cfg.setMarshaller(marsh);
+
+ return cfg;
+ }
+
+ /**
+ * @return Sync mode.
+ */
+ protected CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return PRIMARY_SYNC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Near configuration.
+ */
+ protected abstract NearCacheConfiguration nearConfiguration();
+
+ /**
+ * @return Grid count.
+ */
+ protected int gridCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("BusyWait")
+ public void testGetPut() throws Exception {
+ final AtomicBoolean flag = new AtomicBoolean();
+
+ final LongAdder8 cnt = new LongAdder8();
+
+ try (IgniteDataStreamer<Object, Object> ldr = grid(0).dataStreamer(null)) {
+ IgniteInternalFuture<?> f = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!flag.get()) {
+ ldr.addData(rnd.nextInt(10000), new TestObject(rnd.nextInt(10000)));
+
+ cnt.add(1);
+ }
+
+ return null;
+ }
+ },
+ THREAD_CNT
+ );
+
+ for (int i = 0; i < 30 && !f.isDone(); i++)
+ Thread.sleep(1000);
+
+ flag.set(true);
+
+ f.get();
+ }
+
+ info("Operations in 30 sec: " + cnt.sum());
+ }
+
+ /**
+ */
+ private static class TestObject implements Binarylizable, Serializable {
+ /** */
+ private int val;
+
+ /**
+ */
+ private TestObject() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof TestObject && ((TestObject)obj).val == val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.writeInt("val", val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ val = reader.readInt("val");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractMultiThreadedSelfTest.java
new file mode 100644
index 0000000..b626093
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractMultiThreadedSelfTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.LongAdder8;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ * Test for portable objects stored in cache.
+ */
+public abstract class GridCacheBinaryObjectsAbstractMultiThreadedSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int THREAD_CNT = 64;
+
+ /** */
+ private static final AtomicInteger idxGen = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration();
+
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setAtomicityMode(atomicityMode());
+ cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setWriteSynchronizationMode(writeSynchronizationMode());
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ PortableMarshaller marsh = new PortableMarshaller();
+
+ marsh.setTypeConfigurations(Arrays.asList(
+ new BinaryTypeConfiguration(TestObject.class.getName())));
+
+ cfg.setMarshaller(marsh);
+
+ return cfg;
+ }
+
+ /**
+ * @return Sync mode.
+ */
+ protected CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return PRIMARY_SYNC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Distribution mode.
+ */
+ protected abstract NearCacheConfiguration nearConfiguration();
+
+ /**
+ * @return Grid count.
+ */
+ protected int gridCount() {
+ return 1;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("BusyWait") public void testGetPut() throws Exception {
+ final AtomicBoolean flag = new AtomicBoolean();
+
+ final LongAdder8 cnt = new LongAdder8();
+
+ IgniteInternalFuture<?> f = multithreadedAsync(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int threadId = idxGen.getAndIncrement() % 2;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!flag.get()) {
+ IgniteCache<Object, Object> c = jcache(rnd.nextInt(gridCount()));
+
+ switch (threadId) {
+ case 0:
+ // Put/get/remove portable -> portable.
+
+ c.put(new TestObject(rnd.nextInt(10000)), new TestObject(rnd.nextInt(10000)));
+
+ IgniteCache<Object, Object> p2 = ((IgniteCacheProxy<Object, Object>)c).keepPortable();
+
+ BinaryObject v = (BinaryObject)p2.get(new TestObject(rnd.nextInt(10000)));
+
+ if (v != null)
+ v.deserialize();
+
+ c.remove(new TestObject(rnd.nextInt(10000)));
+
+ break;
+
+ case 1:
+ // Put/get int -> portable.
+ c.put(rnd.nextInt(10000), new TestObject(rnd.nextInt(10000)));
+
+ IgniteCache<Integer, BinaryObject> p4 = ((IgniteCacheProxy<Object, Object>)c).keepPortable();
+
+ BinaryObject v1 = p4.get(rnd.nextInt(10000));
+
+ if (v1 != null)
+ v1.deserialize();
+
+ p4.remove(rnd.nextInt(10000));
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ cnt.add(3);
+ }
+
+ return null;
+ }
+ },
+ THREAD_CNT
+ );
+
+ for (int i = 0; i < 30 && !f.isDone(); i++)
+ Thread.sleep(1000);
+
+ flag.set(true);
+
+ f.get();
+
+ info("Operations in 30 sec: " + cnt.sum());
+ }
+
+ /**
+ */
+ private static class TestObject implements Binarylizable, Serializable {
+ /** */
+ private int val;
+
+ /**
+ */
+ private TestObject() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof TestObject && ((TestObject)obj).val == val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.writeInt("val", val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ val = reader.readInt("val");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractSelfTest.java
new file mode 100644
index 0000000..c25840b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheBinaryObjectsAbstractSelfTest.java
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteObjects;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.portable.BinaryObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.util.typedef.P2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test for portable objects stored in cache.
+ */
+public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonAbstractTest {
+ /** */
+ public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int ENTRY_CNT = 100;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration();
+
+ cacheCfg.setCacheMode(cacheMode());
+ cacheCfg.setAtomicityMode(atomicityMode());
+ cacheCfg.setNearConfiguration(nearConfiguration());
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore()));
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+ cacheCfg.setLoadPreviousValue(true);
+ cacheCfg.setBackups(1);
+
+ if (offheapTiered()) {
+ cacheCfg.setMemoryMode(OFFHEAP_TIERED);
+ cacheCfg.setOffHeapMaxMemory(0);
+ }
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ cfg.setMarshaller(new PortableMarshaller());
+
+ return cfg;
+ }
+
+ /**
+ * @return {@code True} if should use OFFHEAP_TIERED mode.
+ */
+ protected boolean offheapTiered() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGridsMultiThreaded(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ for (int i = 0; i < gridCount(); i++) {
+ GridCacheAdapter<Object, Object> c = ((IgniteKernal)grid(i)).internalCache();
+
+ for (GridCacheEntryEx e : c.map().entries0()) {
+ Object key = e.key().value(c.context().cacheObjectContext(), false);
+ Object val = CU.value(e.rawGet(), c.context(), false);
+
+ if (key instanceof BinaryObject)
+ assert ((BinaryObjectImpl)key).detached() : val;
+
+ if (val instanceof BinaryObject)
+ assert ((BinaryObjectImpl)val).detached() : val;
+ }
+ }
+
+ IgniteCache<Object, Object> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.remove(i);
+
+ if (offheapTiered()) {
+ for (int k = 0; k < 100; k++)
+ c.remove(k);
+ }
+
+ assertEquals(0, c.size());
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ protected abstract CacheMode cacheMode();
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected abstract CacheAtomicityMode atomicityMode();
+
+ /**
+ * @return Distribution mode.
+ */
+ protected abstract NearCacheConfiguration nearConfiguration();
+
+ /**
+ * @return Grid count.
+ */
+ protected abstract int gridCount();
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public void testCircularReference() throws Exception {
+ IgniteCache c = keepPortableCache();
+
+ TestReferenceObject obj1 = new TestReferenceObject();
+
+ obj1.obj = new TestReferenceObject(obj1);
+
+ c.put(1, obj1);
+
+ BinaryObject po = (BinaryObject)c.get(1);
+
+ String str = po.toString();
+
+ log.info("toString: " + str);
+
+ assertNotNull(str);
+
+ assertTrue("Unexpected toString: " + str,
+ str.startsWith("TestReferenceObject") && str.contains("obj=TestReferenceObject ["));
+
+ TestReferenceObject obj1_r = po.deserialize();
+
+ assertNotNull(obj1_r);
+
+ TestReferenceObject obj2_r = obj1_r.obj;
+
+ assertNotNull(obj2_r);
+
+ assertSame(obj1_r, obj2_r.obj);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGet() throws Exception {
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ TestObject obj = c.get(i);
+
+ assertEquals(i, obj.val);
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ BinaryObject po = kpc.get(i);
+
+ assertEquals(i, (int)po.field("val"));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testIterator() throws Exception {
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ Map<Integer, TestObject> entries = new HashMap<>();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ TestObject val = new TestObject(i);
+
+ c.put(i, val);
+
+ entries.put(i, val);
+ }
+
+ IgniteCache<Integer, BinaryObject> prj = ((IgniteCacheProxy)c).keepPortable();
+
+ Iterator<Cache.Entry<Integer, BinaryObject>> it = prj.iterator();
+
+ assertTrue(it.hasNext());
+
+ while (it.hasNext()) {
+ Cache.Entry<Integer, BinaryObject> entry = it.next();
+
+ assertTrue(entries.containsKey(entry.getKey()));
+
+ TestObject o = entries.get(entry.getKey());
+
+ BinaryObject po = entry.getValue();
+
+ assertEquals(o.val, (int)po.field("val"));
+
+ entries.remove(entry.getKey());
+ }
+
+ assertEquals(0, entries.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCollection() throws Exception {
+ IgniteCache<Integer, Collection<TestObject>> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Collection<TestObject> col = new ArrayList<>(3);
+
+ for (int j = 0; j < 3; j++)
+ col.add(new TestObject(i * 10 + j));
+
+ c.put(i, col);
+ }
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Collection<TestObject> col = c.get(i);
+
+ assertEquals(3, col.size());
+
+ Iterator<TestObject> it = col.iterator();
+
+ for (int j = 0; j < 3; j++) {
+ assertTrue(it.hasNext());
+
+ assertEquals(i * 10 + j, it.next().val);
+ }
+ }
+
+ IgniteCache<Integer, Collection<BinaryObject>> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Collection<BinaryObject> col = kpc.get(i);
+
+ assertEquals(3, col.size());
+
+ Iterator<BinaryObject> it = col.iterator();
+
+ for (int j = 0; j < 3; j++) {
+ assertTrue(it.hasNext());
+
+ assertEquals(i * 10 + j, (int)it.next().field("val"));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMap() throws Exception {
+ IgniteCache<Integer, Map<Integer, TestObject>> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Map<Integer, TestObject> map = U.newHashMap(3);
+
+ for (int j = 0; j < 3; j++) {
+ int idx = i * 10 + j;
+
+ map.put(idx, new TestObject(idx));
+ }
+
+ c.put(i, map);
+ }
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Map<Integer, TestObject> map = c.get(i);
+
+ assertEquals(3, map.size());
+
+ for (int j = 0; j < 3; j++) {
+ int idx = i * 10 + j;
+
+ assertEquals(idx, map.get(idx).val);
+ }
+ }
+
+ IgniteCache<Integer, Map<Integer, BinaryObject>> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ Map<Integer, BinaryObject> map = kpc.get(i);
+
+ assertEquals(3, map.size());
+
+ for (int j = 0; j < 3; j++) {
+ int idx = i * 10 + j;
+
+ assertEquals(idx, (int)map.get(idx).field("val"));
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAsync() throws Exception {
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ IgniteCache<Integer, TestObject> cacheAsync = c.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ cacheAsync.get(i);
+
+ TestObject obj = cacheAsync.<TestObject>future().get();
+
+ assertNotNull(obj);
+
+ assertEquals(i, obj.val);
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+
+ IgniteCache<Integer, BinaryObject> cachePortableAsync = kpc.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ cachePortableAsync.get(i);
+
+ BinaryObject po = cachePortableAsync.<BinaryObject>future().get();
+
+ assertEquals(i, (int)po.field("val"));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTx() throws Exception {
+ if (atomicityMode() != TRANSACTIONAL)
+ return;
+
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ TestObject obj = c.get(i);
+
+ assertEquals(i, obj.val);
+
+ tx.commit();
+ }
+ }
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ TestObject obj = c.get(i);
+
+ assertEquals(i, obj.val);
+
+ tx.commit();
+ }
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ BinaryObject po = kpc.get(i);
+
+ assertEquals(i, (int)po.field("val"));
+
+ tx.commit();
+ }
+ }
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ BinaryObject po = kpc.get(i);
+
+ assertEquals(i, (int)po.field("val"));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAsyncTx() throws Exception {
+ if (atomicityMode() != TRANSACTIONAL)
+ return;
+
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ IgniteCache<Integer, TestObject> cacheAsync = c.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cacheAsync.get(i);
+
+ TestObject obj = cacheAsync.<TestObject>future().get();
+
+ assertEquals(i, obj.val);
+
+ tx.commit();
+ }
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+ IgniteCache<Integer, BinaryObject> cachePortableAsync = kpc.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++) {
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cachePortableAsync.get(i);
+
+ BinaryObject po = cachePortableAsync.<BinaryObject>future().get();
+
+ assertEquals(i, (int)po.field("val"));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAll() throws Exception {
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ Map<Integer, TestObject> objs = c.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, TestObject> e : objs.entrySet())
+ assertEquals(e.getKey().intValue(), e.getValue().val);
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ Map<Integer, BinaryObject> objs = kpc.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, BinaryObject> e : objs.entrySet())
+ assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val"));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAllAsync() throws Exception {
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ IgniteCache<Integer, TestObject> cacheAsync = c.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ cacheAsync.getAll(keys);
+
+ Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get();
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, TestObject> e : objs.entrySet())
+ assertEquals(e.getKey().intValue(), e.getValue().val);
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+ IgniteCache<Integer, BinaryObject> cachePortableAsync = kpc.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+
+ cachePortableAsync.getAll(keys);
+
+ Map<Integer, BinaryObject> objs = cachePortableAsync.<Map<Integer, BinaryObject>>future().get();
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, BinaryObject> e : objs.entrySet())
+ assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val"));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAllTx() throws Exception {
+ if (atomicityMode() != TRANSACTIONAL)
+ return;
+
+ IgniteCache<Integer, TestObject> c = jcache(0);
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Map<Integer, TestObject> objs = c.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, TestObject> e : objs.entrySet())
+ assertEquals(e.getKey().intValue(), e.getValue().val);
+
+ tx.commit();
+ }
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ Map<Integer, TestObject> objs = c.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, TestObject> e : objs.entrySet())
+ assertEquals(e.getKey().intValue(), e.getValue().val);
+
+ tx.commit();
+ }
+ }
+
+ IgniteCache<Integer, BinaryObject> kpc = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Map<Integer, BinaryObject> objs = kpc.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, BinaryObject> e : objs.entrySet())
+ assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val"));
+
+ tx.commit();
+ }
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ Map<Integer, BinaryObject> objs = kpc.getAll(keys);
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, BinaryObject> e : objs.entrySet())
+ assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val"));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetAllAsyncTx() throws Exception {
+ if (atomicityMode() != TRANSACTIONAL)
+ return;
+
+ IgniteCache<Integer, TestObject> c = jcache(0);
+ IgniteCache<Integer, TestObject> cacheAsync = c.withAsync();
+
+ for (int i = 0; i < ENTRY_CNT; i++)
+ c.put(i, new TestObject(i));
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cacheAsync.getAll(keys);
+
+ Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get();
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, TestObject> e : objs.entrySet())
+ assertEquals(e.getKey().intValue(), e.getValue().val);
+
+ tx.commit();
+ }
+ }
+
+ IgniteCache<Integer, BinaryObject> cache = keepPortableCache();
+
+ for (int i = 0; i < ENTRY_CNT; ) {
+ Set<Integer> keys = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ keys.add(i++);
+
+ IgniteCache<Integer, BinaryObject> asyncCache = cache.withAsync();
+
+ try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ asyncCache.getAll(keys);
+
+ Map<Integer, BinaryObject> objs = asyncCache.<Map<Integer, BinaryObject>>future().get();
+
+ assertEquals(10, objs.size());
+
+ for (Map.Entry<Integer, BinaryObject> e : objs.entrySet())
+ assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val"));
+
+ tx.commit();
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCache() throws Exception {
+ for (int i = 0; i < gridCount(); i++)
+ jcache(i).localLoadCache(null);
+
+ IgniteCache<Integer, TestObject> cache = jcache(0);
+
+ assertEquals(3, cache.size(CachePeekMode.PRIMARY));
+
+ assertEquals(1, cache.get(1).val);
+ assertEquals(2, cache.get(2).val);
+ assertEquals(3, cache.get(3).val);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCacheAsync() throws Exception {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Object, Object> jcache = jcache(i).withAsync();
+
+ jcache.loadCache(null);
+
+ jcache.future().get();
+ }
+
+ IgniteCache<Integer, TestObject> cache = jcache(0);
+
+ assertEquals(3, cache.size(CachePeekMode.PRIMARY));
+
+ assertEquals(1, cache.get(1).val);
+ assertEquals(2, cache.get(2).val);
+ assertEquals(3, cache.get(3).val);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCacheFilteredAsync() throws Exception {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Integer, TestObject> c = this.<Integer, TestObject>jcache(i).withAsync();
+
+ c.loadCache(new P2<Integer, TestObject>() {
+ @Override public boolean apply(Integer key, TestObject val) {
+ return val.val < 3;
+ }
+ });
+
+ c.future().get();
+ }
+
+ IgniteCache<Integer, TestObject> cache = jcache(0);
+
+ assertEquals(2, cache.size(CachePeekMode.PRIMARY));
+
+ assertEquals(1, cache.get(1).val);
+ assertEquals(2, cache.get(2).val);
+
+ assertNull(cache.get(3));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransform() throws Exception {
+ IgniteCache<Integer, BinaryObject> c = keepPortableCache();
+
+ checkTransform(primaryKey(c));
+
+ if (cacheMode() != CacheMode.LOCAL) {
+ checkTransform(backupKey(c));
+
+ if (nearConfiguration() != null)
+ checkTransform(nearKey(c));
+ }
+ }
+
+ /**
+ * @return Cache with keep portable flag.
+ */
+ private <K, V> IgniteCache<K, V> keepPortableCache() {
+ return ignite(0).cache(null).withKeepBinary();
+ }
+
+ /**
+ * @param key Key.
+ * @throws Exception If failed.
+ */
+ private void checkTransform(Integer key) throws Exception {
+ log.info("Transform: " + key);
+
+ IgniteCache<Integer, BinaryObject> c = keepPortableCache();
+
+ try {
+ c.invoke(key, new EntryProcessor<Integer, BinaryObject, Void>() {
+ @Override public Void process(MutableEntry<Integer, BinaryObject> e, Object... args) {
+ BinaryObject val = e.getValue();
+
+ assertNull("Unexpected value: " + val, val);
+
+ return null;
+ }
+ });
+
+ jcache(0).put(key, new TestObject(1));
+
+ c.invoke(key, new EntryProcessor<Integer, BinaryObject, Void>() {
+ @Override public Void process(MutableEntry<Integer, BinaryObject> e, Object... args) {
+ BinaryObject val = e.getValue();
+
+ assertNotNull("Unexpected value: " + val, val);
+
+ assertEquals(new Integer(1), val.field("val"));
+
+ Ignite ignite = e.unwrap(Ignite.class);
+
+ IgniteObjects portables = ignite.portables();
+
+ BinaryObjectBuilder builder = portables.builder(val);
+
+ builder.setField("val", 2);
+
+ e.setValue(builder.build());
+
+ return null;
+ }
+ });
+
+ BinaryObject obj = c.get(key);
+
+ assertEquals(new Integer(2), obj.field("val"));
+
+ c.invoke(key, new EntryProcessor<Integer, BinaryObject, Void>() {
+ @Override public Void process(MutableEntry<Integer, BinaryObject> e, Object... args) {
+ BinaryObject val = e.getValue();
+
+ assertNotNull("Unexpected value: " + val, val);
+
+ assertEquals(new Integer(2), val.field("val"));
+
+ e.setValue(val);
+
+ return null;
+ }
+ });
+
+ obj = c.get(key);
+
+ assertEquals(new Integer(2), obj.field("val"));
+
+ c.invoke(key, new EntryProcessor<Integer, BinaryObject, Void>() {
+ @Override public Void process(MutableEntry<Integer, BinaryObject> e, Object... args) {
+ BinaryObject val = e.getValue();
+
+ assertNotNull("Unexpected value: " + val, val);
+
+ assertEquals(new Integer(2), val.field("val"));
+
+ e.remove();
+
+ return null;
+ }
+ });
+
+ assertNull(c.get(key));
+ }
+ finally {
+ c.remove(key);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestObject implements Binarylizable {
+ /** */
+ private int val;
+
+ /**
+ */
+ private TestObject() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.writeInt("val", val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ val = reader.readInt("val");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestReferenceObject implements Binarylizable {
+ /** */
+ private TestReferenceObject obj;
+
+ /**
+ */
+ private TestReferenceObject() {
+ // No-op.
+ }
+
+ /**
+ * @param obj Object.
+ */
+ private TestReferenceObject(TestReferenceObject obj) {
+ this.obj = obj;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.writeObject("obj", obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ obj = reader.readObject("obj");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<Integer, Object> {
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<Integer, Object> clo, Object... args) {
+ for (int i = 1; i <= 3; i++)
+ clo.apply(i, new TestObject(i));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object load(Integer key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ?> e) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
new file mode 100644
index 0000000..eddf066
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataMultinodeTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteObjects;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryTypeMetadata;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class GridCacheClientNodeBinaryObjectMetadataMultinodeTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true);
+
+ cfg.setMarshaller(new PortableMarshaller());
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientMetadataInitialization() throws Exception {
+ startGrids(2);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final ConcurrentHashSet<String> allTypes = new ConcurrentHashSet<>();
+
+ IgniteInternalFuture<?> fut;
+
+ try {
+ // Update portable metadata concurrently with client nodes start.
+ fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteObjects portables = ignite(0).portables();
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(null).withKeepBinary();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 1000; i++) {
+ log.info("Iteration: " + i);
+
+ String type = "portable-type-" + i;
+
+ allTypes.add(type);
+
+ for (int f = 0; f < 10; f++) {
+ BinaryObjectBuilder builder = portables.builder(type);
+
+ String fieldName = "f" + f;
+
+ builder.setField(fieldName, i);
+
+ cache.put(rnd.nextInt(0, 100_000), builder.build());
+
+ if (f % 100 == 0)
+ log.info("Put iteration: " + f);
+ }
+
+ if (stop.get())
+ break;
+ }
+
+ return null;
+ }
+ }, 5, "update-thread");
+ }
+ finally {
+ stop.set(true);
+ }
+
+ client = true;
+
+ startGridsMultiThreaded(2, 5);
+
+ fut.get();
+
+ assertFalse(allTypes.isEmpty());
+
+ log.info("Expected portable types: " + allTypes.size());
+
+ assertEquals(7, ignite(0).cluster().nodes().size());
+
+ for (int i = 0; i < 7; i++) {
+ log.info("Check metadata on node: " + i);
+
+ boolean client = i > 1;
+
+ assertEquals((Object)client, ignite(i).configuration().isClientMode());
+
+ IgniteObjects portables = ignite(i).portables();
+
+ Collection<BinaryTypeMetadata> metaCol = portables.metadata();
+
+ assertEquals(allTypes.size(), metaCol.size());
+
+ Set<String> names = new HashSet<>();
+
+ for (BinaryTypeMetadata meta : metaCol) {
+ assertTrue(names.add(meta.typeName()));
+
+ assertNull(meta.affinityKeyFieldName());
+
+ assertEquals(10, meta.fields().size());
+ }
+
+ assertEquals(allTypes.size(), names.size());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailoverOnStart() throws Exception {
+ startGrids(4);
+
+ IgniteObjects portables = ignite(0).portables();
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(null).withKeepBinary();
+
+ for (int i = 0; i < 1000; i++) {
+ BinaryObjectBuilder builder = portables.builder("type-" + i);
+
+ builder.setField("f0", i);
+
+ cache.put(i, builder.build());
+ }
+
+ client = true;
+
+ final CyclicBarrier barrier = new CyclicBarrier(6);
+
+ final AtomicInteger startIdx = new AtomicInteger(4);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ barrier.await();
+
+ Ignite ignite = startGrid(startIdx.getAndIncrement());
+
+ assertTrue(ignite.configuration().isClientMode());
+
+ log.info("Started node: " + ignite.name());
+
+ return null;
+ }
+ }, 5, "start-thread");
+
+ barrier.await();
+
+ U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
+
+ for (int i = 0; i < 3; i++)
+ stopGrid(i);
+
+ fut.get();
+
+ assertEquals(6, ignite(3).cluster().nodes().size());
+
+ for (int i = 3; i < 7; i++) {
+ log.info("Check metadata on node: " + i);
+
+ boolean client = i > 3;
+
+ assertEquals((Object) client, ignite(i).configuration().isClientMode());
+
+ portables = ignite(i).portables();
+
+ final IgniteObjects p0 = portables;
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ Collection<BinaryTypeMetadata> metaCol = p0.metadata();
+
+ return metaCol.size() == 1000;
+ }
+ }, getTestTimeout());
+
+ Collection<BinaryTypeMetadata> metaCol = portables.metadata();
+
+ assertEquals(1000, metaCol.size());
+
+ Set<String> names = new HashSet<>();
+
+ for (BinaryTypeMetadata meta : metaCol) {
+ assertTrue(names.add(meta.typeName()));
+
+ assertNull(meta.affinityKeyFieldName());
+
+ assertEquals(1, meta.fields().size());
+ }
+
+ assertEquals(1000, names.size());
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientStartsFirst() throws Exception {
+ client = true;
+
+ Ignite ignite0 = startGrid(0);
+
+ assertTrue(ignite0.configuration().isClientMode());
+
+ client = false;
+
+ Ignite ignite1 = startGrid(1);
+
+ assertFalse(ignite1.configuration().isClientMode());
+
+ IgniteObjects portables = ignite(1).portables();
+
+ IgniteCache<Object, Object> cache = ignite(1).cache(null).withKeepBinary();
+
+ for (int i = 0; i < 100; i++) {
+ BinaryObjectBuilder builder = portables.builder("type-" + i);
+
+ builder.setField("f0", i);
+
+ cache.put(i, builder.build());
+ }
+
+ assertEquals(100, ignite(0).portables().metadata().size());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataTest.java
new file mode 100644
index 0000000..c2464db
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeBinaryObjectMetadataTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.marshaller.portable.PortableMarshaller;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryTypeMetadata;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class GridCacheClientNodeBinaryObjectMetadataTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ PortableMarshaller marsh = new PortableMarshaller();
+
+ marsh.setClassNames(Arrays.asList(TestObject1.class.getName(), TestObject2.class.getName()));
+
+ BinaryTypeConfiguration typeCfg = new BinaryTypeConfiguration();
+
+ typeCfg.setClassName(TestObject1.class.getName());
+
+ CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(TestObject1.class.getName(), "val2");
+
+ cfg.setCacheKeyCfg(keyCfg);
+
+ marsh.setTypeConfigurations(Arrays.asList(typeCfg));
+
+ if (gridName.equals(getTestGridName(gridCount() - 1)))
+ cfg.setClientMode(true);
+
+ cfg.setMarshaller(marsh);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPortableMetadataOnClient() throws Exception {
+ Ignite ignite0 = ignite(gridCount() - 1);
+
+ assertTrue(ignite0.configuration().isClientMode());
+
+ Ignite ignite1 = ignite(0);
+
+ assertFalse(ignite1.configuration().isClientMode());
+
+ Affinity<Object> aff0 = ignite0.affinity(null);
+ Affinity<Object> aff1 = ignite1.affinity(null);
+
+ for (int i = 0 ; i < 100; i++) {
+ TestObject1 obj1 = new TestObject1(i, i + 1);
+
+ assertEquals(aff1.mapKeyToPrimaryAndBackups(obj1),
+ aff0.mapKeyToPrimaryAndBackups(obj1));
+
+ TestObject2 obj2 = new TestObject2(i, i + 1);
+
+ assertEquals(aff1.mapKeyToPrimaryAndBackups(obj2),
+ aff0.mapKeyToPrimaryAndBackups(obj2));
+ }
+
+ {
+ BinaryObjectBuilder builder = ignite0.portables().builder("TestObject3");
+
+ builder.setField("f1", 1);
+
+ ignite0.cache(null).put(0, builder.build());
+
+ IgniteCache<Integer, BinaryObject> cache = ignite0.cache(null).withKeepBinary();
+
+ BinaryObject obj = cache.get(0);
+
+ BinaryTypeMetadata meta = obj.metaData();
+
+ assertNotNull(meta);
+ assertEquals(1, meta.fields().size());
+
+ meta = ignite0.portables().metadata(TestObject1.class);
+
+ assertNotNull(meta);
+ assertEquals("val2", meta.affinityKeyFieldName());
+
+ meta = ignite0.portables().metadata(TestObject2.class);
+
+ assertNotNull(meta);
+ assertNull(meta.affinityKeyFieldName());
+ }
+
+ {
+ BinaryObjectBuilder builder = ignite1.portables().builder("TestObject3");
+
+ builder.setField("f2", 2);
+
+ ignite1.cache(null).put(1, builder.build());
+
+ IgniteCache<Integer, BinaryObject> cache = ignite1.cache(null).withKeepBinary();
+
+ BinaryObject obj = cache.get(0);
+
+ BinaryTypeMetadata meta = obj.metaData();
+
+ assertNotNull(meta);
+ assertEquals(2, meta.fields().size());
+
+ meta = ignite1.portables().metadata(TestObject1.class);
+
+ assertNotNull(meta);
+ assertEquals("val2", meta.affinityKeyFieldName());
+
+ meta = ignite1.portables().metadata(TestObject2.class);
+
+ assertNotNull(meta);
+ assertNull(meta.affinityKeyFieldName());
+ }
+
+ BinaryTypeMetadata meta = ignite0.portables().metadata("TestObject3");
+
+ assertNotNull(meta);
+ assertEquals(2, meta.fields().size());
+
+ IgniteCache<Integer, BinaryObject> cache = ignite0.cache(null).withKeepBinary();
+
+ BinaryObject obj = cache.get(1);
+
+ assertEquals(Integer.valueOf(2), obj.field("f2"));
+ assertNull(obj.field("f1"));
+
+ meta = obj.metaData();
+
+ assertNotNull(meta);
+ assertEquals(2, meta.fields().size());
+
+ Collection<BinaryTypeMetadata> meta1 = ignite1.portables().metadata();
+ Collection<BinaryTypeMetadata> meta2 = ignite1.portables().metadata();
+
+ assertEquals(meta1.size(), meta2.size());
+
+ for (BinaryTypeMetadata m1 : meta1) {
+ boolean found = false;
+
+ for (BinaryTypeMetadata m2 : meta1) {
+ if (m1.typeName().equals(m2.typeName())) {
+ assertEquals(m1.affinityKeyFieldName(), m2.affinityKeyFieldName());
+ assertEquals(m1.fields(), m2.fields());
+
+ found = true;
+
+ break;
+ }
+ }
+
+ assertTrue(found);
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestObject1 {
+ /** */
+ private int val1;
+
+ /** */
+ private int val2;
+
+ /**
+ * @param val1 Value 1.
+ * @param val2 Value 2.
+ */
+ public TestObject1(int val1, int val2) {
+ this.val1 = val1;
+ this.val2 = val2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestObject1 that = (TestObject1)o;
+
+ return val1 == that.val1;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val1;
+ }
+ }
+
+ /**
+ *
+ */
+ static class TestObject2 {
+ /** */
+ private int val1;
+
+ /** */
+ private int val2;
+
+ /**
+ * @param val1 Value 1.
+ * @param val2 Value 2.
+ */
+ public TestObject2(int val1, int val2) {
+ this.val1 = val1;
+ this.val2 = val2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestObject2 that = (TestObject2)o;
+
+ return val2 == that.val2;
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val2;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20f5b9cd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeIgniteObjectMetadataMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeIgniteObjectMetadataMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeIgniteObjectMetadataMultinodeTest.java
deleted file mode 100644
index e86a160..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCacheClientNodeIgniteObjectMetadataMultinodeTest.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.portable;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteObjects;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.portable.PortableMarshaller;
-import org.apache.ignite.igniteobject.IgniteObjectBuilder;
-import org.apache.ignite.igniteobject.IgniteObjectMetadata;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class GridCacheClientNodeIgniteObjectMetadataMultinodeTest extends GridCommonAbstractTest {
- /** */
- protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private boolean client;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setPeerClassLoadingEnabled(false);
-
- ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true);
-
- cfg.setMarshaller(new PortableMarshaller());
-
- CacheConfiguration ccfg = new CacheConfiguration();
-
- ccfg.setWriteSynchronizationMode(FULL_SYNC);
-
- cfg.setCacheConfiguration(ccfg);
-
- cfg.setClientMode(client);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientMetadataInitialization() throws Exception {
- startGrids(2);
-
- final AtomicBoolean stop = new AtomicBoolean();
-
- final ConcurrentHashSet<String> allTypes = new ConcurrentHashSet<>();
-
- IgniteInternalFuture<?> fut;
-
- try {
- // Update portable metadata concurrently with client nodes start.
- fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- IgniteObjects portables = ignite(0).portables();
-
- IgniteCache<Object, Object> cache = ignite(0).cache(null).withKeepBinary();
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- for (int i = 0; i < 1000; i++) {
- log.info("Iteration: " + i);
-
- String type = "portable-type-" + i;
-
- allTypes.add(type);
-
- for (int f = 0; f < 10; f++) {
- IgniteObjectBuilder builder = portables.builder(type);
-
- String fieldName = "f" + f;
-
- builder.setField(fieldName, i);
-
- cache.put(rnd.nextInt(0, 100_000), builder.build());
-
- if (f % 100 == 0)
- log.info("Put iteration: " + f);
- }
-
- if (stop.get())
- break;
- }
-
- return null;
- }
- }, 5, "update-thread");
- }
- finally {
- stop.set(true);
- }
-
- client = true;
-
- startGridsMultiThreaded(2, 5);
-
- fut.get();
-
- assertFalse(allTypes.isEmpty());
-
- log.info("Expected portable types: " + allTypes.size());
-
- assertEquals(7, ignite(0).cluster().nodes().size());
-
- for (int i = 0; i < 7; i++) {
- log.info("Check metadata on node: " + i);
-
- boolean client = i > 1;
-
- assertEquals((Object)client, ignite(i).configuration().isClientMode());
-
- IgniteObjects portables = ignite(i).portables();
-
- Collection<IgniteObjectMetadata> metaCol = portables.metadata();
-
- assertEquals(allTypes.size(), metaCol.size());
-
- Set<String> names = new HashSet<>();
-
- for (IgniteObjectMetadata meta : metaCol) {
- assertTrue(names.add(meta.typeName()));
-
- assertNull(meta.affinityKeyFieldName());
-
- assertEquals(10, meta.fields().size());
- }
-
- assertEquals(allTypes.size(), names.size());
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFailoverOnStart() throws Exception {
- startGrids(4);
-
- IgniteObjects portables = ignite(0).portables();
-
- IgniteCache<Object, Object> cache = ignite(0).cache(null).withKeepBinary();
-
- for (int i = 0; i < 1000; i++) {
- IgniteObjectBuilder builder = portables.builder("type-" + i);
-
- builder.setField("f0", i);
-
- cache.put(i, builder.build());
- }
-
- client = true;
-
- final CyclicBarrier barrier = new CyclicBarrier(6);
-
- final AtomicInteger startIdx = new AtomicInteger(4);
-
- IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- barrier.await();
-
- Ignite ignite = startGrid(startIdx.getAndIncrement());
-
- assertTrue(ignite.configuration().isClientMode());
-
- log.info("Started node: " + ignite.name());
-
- return null;
- }
- }, 5, "start-thread");
-
- barrier.await();
-
- U.sleep(ThreadLocalRandom.current().nextInt(10, 100));
-
- for (int i = 0; i < 3; i++)
- stopGrid(i);
-
- fut.get();
-
- assertEquals(6, ignite(3).cluster().nodes().size());
-
- for (int i = 3; i < 7; i++) {
- log.info("Check metadata on node: " + i);
-
- boolean client = i > 3;
-
- assertEquals((Object) client, ignite(i).configuration().isClientMode());
-
- portables = ignite(i).portables();
-
- final IgniteObjects p0 = portables;
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- Collection<IgniteObjectMetadata> metaCol = p0.metadata();
-
- return metaCol.size() == 1000;
- }
- }, getTestTimeout());
-
- Collection<IgniteObjectMetadata> metaCol = portables.metadata();
-
- assertEquals(1000, metaCol.size());
-
- Set<String> names = new HashSet<>();
-
- for (IgniteObjectMetadata meta : metaCol) {
- assertTrue(names.add(meta.typeName()));
-
- assertNull(meta.affinityKeyFieldName());
-
- assertEquals(1, meta.fields().size());
- }
-
- assertEquals(1000, names.size());
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testClientStartsFirst() throws Exception {
- client = true;
-
- Ignite ignite0 = startGrid(0);
-
- assertTrue(ignite0.configuration().isClientMode());
-
- client = false;
-
- Ignite ignite1 = startGrid(1);
-
- assertFalse(ignite1.configuration().isClientMode());
-
- IgniteObjects portables = ignite(1).portables();
-
- IgniteCache<Object, Object> cache = ignite(1).cache(null).withKeepBinary();
-
- for (int i = 0; i < 100; i++) {
- IgniteObjectBuilder builder = portables.builder("type-" + i);
-
- builder.setField("f0", i);
-
- cache.put(i, builder.build());
- }
-
- assertEquals(100, ignite(0).portables().metadata().size());
- }
-}
\ No newline at end of file