You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:06 UTC
[2/5] incubator-ignite git commit: # gg-9869
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
deleted file mode 100644
index 23a46e5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ /dev/null
@@ -1,924 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
- /** */
- private static ConcurrentHashMap<Object, Object> storeMap;
-
- /** */
- private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private CacheMode mode = PARTITIONED;
-
- /** */
- private boolean nearEnabled = true;
-
- /** */
- private boolean useCache;
-
- /** */
- private TestStore store;
-
- /** {@inheritDoc} */
- @Override public void afterTest() throws Exception {
- super.afterTest();
-
- useCache = false;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "unchecked"})
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
- spi.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(spi);
-
- cfg.setIncludeProperties();
-
- cfg.setMarshaller(new OptimizedMarshaller(false));
-
- if (useCache) {
- CacheConfiguration cc = defaultCacheConfiguration();
-
- cc.setCacheMode(mode);
- cc.setAtomicityMode(TRANSACTIONAL);
- cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
- cc.setWriteSynchronizationMode(FULL_SYNC);
-
- cc.setEvictSynchronized(false);
- cc.setEvictNearSynchronized(false);
-
- if (store != null) {
- cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
- cc.setReadThrough(true);
- cc.setWriteThrough(true);
- }
-
- cfg.setCacheConfiguration(cc);
- }
- else
- cfg.setCacheConfiguration();
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitioned() throws Exception {
- mode = PARTITIONED;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testColocated() throws Exception {
- mode = PARTITIONED;
- nearEnabled = false;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicated() throws Exception {
- mode = REPLICATED;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLocal() throws Exception {
- mode = LOCAL;
-
- try {
- checkDataStreamer();
-
- assert false;
- }
- catch (IgniteCheckedException e) {
- // Cannot load local cache configured remotely.
- info("Caught expected exception: " + e);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- @SuppressWarnings("ErrorNotRethrown")
- private void checkDataStreamer() throws Exception {
- try {
- Ignite g1 = startGrid(1);
-
- useCache = true;
-
- Ignite g2 = startGrid(2);
- startGrid(3);
-
- final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
- ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
- final AtomicInteger idxGen = new AtomicInteger();
- final int cnt = 400;
- final int threads = 10;
-
- final CountDownLatch l1 = new CountDownLatch(threads);
-
- IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- int idx = idxGen.getAndIncrement();
-
- futs.add(ldr.addData(idx, idx));
- }
-
- l1.countDown();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, threads);
-
- l1.await();
-
- // This will wait until data streamer finishes loading.
- stopGrid(getTestGridName(1), false);
-
- f1.get();
-
- int s2 = internalCache(2).primaryKeySet().size();
- int s3 = internalCache(3).primaryKeySet().size();
- int total = threads * cnt;
-
- assertEquals(total, s2 + s3);
-
- final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
-
- rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
- final CountDownLatch l2 = new CountDownLatch(threads);
-
- IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- final int key = idxGen.decrementAndGet();
-
- futs.add(rmvLdr.removeData(key));
- }
-
- l2.countDown();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, threads);
-
- l2.await();
-
- rmvLdr.close(false);
-
- f2.get();
-
- s2 = internalCache(2).primaryKeySet().size();
- s3 = internalCache(3).primaryKeySet().size();
-
- assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionedIsolated() throws Exception {
- mode = PARTITIONED;
-
- checkIsolatedDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedIsolated() throws Exception {
- mode = REPLICATED;
-
- checkIsolatedDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkIsolatedDataStreamer() throws Exception {
- try {
- useCache = true;
-
- Ignite g1 = startGrid(0);
- startGrid(1);
- startGrid(2);
-
- awaitPartitionMapExchange();
-
- GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
-
- for (int i = 0; i < 100; i++)
- cache.put(i, -1);
-
- final int cnt = 40_000;
- final int threads = 10;
-
- try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
- final AtomicInteger idxGen = new AtomicInteger();
-
- IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- for (int i = 0; i < cnt; i++) {
- int idx = idxGen.getAndIncrement();
-
- ldr.addData(idx, idx);
- }
-
- return null;
- }
- }, threads);
-
- f1.get();
- }
-
- for (int g = 0; g < 3; g++) {
- ClusterNode locNode = grid(g).localNode();
-
- GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
-
- if (cache0.isNear())
- cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
-
- CacheAffinity<Integer> aff = cache0.affinity();
-
- for (int key = 0; key < cnt * threads; key++) {
- if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
- GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
-
- assertNotNull("Missing entry for key: " + key, entry);
- assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
- }
- }
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * Test primitive arrays can be passed into data streamer.
- *
- * @throws Exception If failed.
- */
- public void testPrimitiveArrays() throws Exception {
- try {
- useCache = true;
- mode = PARTITIONED;
-
- Ignite g1 = startGrid(1);
- startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
-
- List<Object> arrays = Arrays.<Object>asList(
- new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
- new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
-
- IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
-
- for (int i = 0, size = arrays.size(); i < 1000; i++) {
- Object arr = arrays.get(i % size);
-
- dataLdr.addData(i, arr);
- dataLdr.addData(i, fixedClosure(arr));
- }
-
- dataLdr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedMultiThreaded() throws Exception {
- mode = REPLICATED;
-
- checkLoaderMultithreaded(1, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionedMultiThreaded() throws Exception {
- mode = PARTITIONED;
-
- checkLoaderMultithreaded(1, 3);
- }
-
- /**
- * Tests loader in multithreaded environment with various count of grids started.
- *
- * @param nodesCntNoCache How many nodes should be started without cache.
- * @param nodesCntCache How many nodes should be started with cache.
- * @throws Exception If failed.
- */
- protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
- throws Exception {
- try {
- // Start all required nodes.
- int idx = 1;
-
- for (int i = 0; i < nodesCntNoCache; i++)
- startGrid(idx++);
-
- useCache = true;
-
- for (int i = 0; i < nodesCntCache; i++)
- startGrid(idx++);
-
- Ignite g1 = grid(1);
-
- // Get and configure loader.
- final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
- ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
- ldr.perNodeBufferSize(2);
-
- // Define count of puts.
- final AtomicInteger idxGen = new AtomicInteger();
-
- final AtomicBoolean done = new AtomicBoolean();
-
- try {
- final int totalPutCnt = 50000;
-
- IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>();
-
- while (!done.get()) {
- int idx = idxGen.getAndIncrement();
-
- if (idx >= totalPutCnt) {
- info(">>> Stopping producer thread since maximum count of puts reached.");
-
- break;
- }
-
- futs.add(ldr.addData(idx, idx));
- }
-
- ldr.flush();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, 5, "producer");
-
- IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!done.get()) {
- ldr.flush();
-
- U.sleep(100);
- }
-
- return null;
- }
- }, 1, "flusher");
-
- // Define index of node being restarted.
- final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
-
- IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- for (int i = 0; i < 5; i++) {
- Ignite g = startGrid(restartNodeIdx);
-
- UUID id = g.cluster().localNode().id();
-
- info(">>>>>>> Started node: " + id);
-
- U.sleep(1000);
-
- stopGrid(getTestGridName(restartNodeIdx), true);
-
- info(">>>>>>> Stopped node: " + id);
- }
- }
- finally {
- done.set(true);
-
- info("Start stop thread finished.");
- }
-
- return null;
- }
- }, 1, "start-stop-thread");
-
- fut1.get();
- fut2.get();
- fut3.get();
- }
- finally {
- ldr.close(false);
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLoaderApi() throws Exception {
- useCache = true;
-
- try {
- Ignite g1 = startGrid(1);
-
- IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
-
- ldr.close(false);
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- ldr.future().get();
-
- try {
- // Create another loader.
- ldr = g1.dataStreamer("UNKNOWN_CACHE");
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- ldr.close(true);
-
- assert ldr.future().isDone();
-
- ldr.future().get();
-
- // Create another loader.
- ldr = g1.dataStreamer(null);
-
- // Cancel with future.
- ldr.future().cancel();
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- try {
- ldr.future().get();
-
- assert false;
- }
- catch (IgniteFutureCancelledException e) {
- info("Caught expected exception: " + e);
- }
-
- // Create another loader.
- ldr = g1.dataStreamer(null);
-
- // This will close loader.
- stopGrid(getTestGridName(1), false);
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- ldr.future().get();
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * Wraps integer to closure returning it.
- *
- * @param i Value to wrap.
- * @return Callable.
- */
- private static Callable<Integer> callable(@Nullable final Integer i) {
- return new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return i;
- }
- };
- }
-
- /**
- * Wraps integer to closure returning it.
- *
- * @param i Value to wrap.
- * @return Closure.
- */
- private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
- return new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer e) {
- return e == null ? i : e + i;
- }
- };
- }
-
- /**
- * Wraps object to closure returning it.
- *
- * @param obj Value to wrap.
- * @return Closure.
- */
- private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
- return new IgniteClosure<T, T>() {
- @Override public T apply(T e) {
- assert e == null || obj == null || e.getClass() == obj.getClass() :
- "Expects the same types [e=" + e + ", obj=" + obj + ']';
-
- return obj;
- }
- };
- }
-
- /**
- * Wraps integer to closure expecting it and returning {@code null}.
- *
- * @param exp Expected closure value.
- * @return Remove expected cache value closure.
- */
- private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
- return new IgniteClosure<T, T>() {
- @Override public T apply(T act) {
- if (exp == null ? act == null : exp.equals(act))
- return null;
-
- throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
- }
- };
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFlush() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- final IgniteCache<Integer, Integer> c = g.jcache(null);
-
- final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- multithreaded(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- ldr.flush();
-
- assertEquals(9, c.size());
-
- return null;
- }
- }, 5, "flush-checker");
-
- ldr.addData(100, 100);
-
- ldr.flush();
-
- assertEquals(10, c.size());
-
- ldr.addData(200, 200);
-
- ldr.close(false);
-
- ldr.future().get();
-
- assertEquals(11, c.size());
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTryFlush() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- IgniteCache<Integer, Integer> c = g.jcache(null);
-
- IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- ldr.tryFlush();
-
- Thread.sleep(100);
-
- assertEquals(9, c.size());
-
- ldr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFlushTimeout() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- final CountDownLatch latch = new CountDownLatch(9);
-
- g.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- latch.countDown();
-
- return true;
- }
- }, EVT_CACHE_OBJECT_PUT);
-
- IgniteCache<Integer, Integer> c = g.jcache(null);
-
- assertTrue(c.localSize() == 0);
-
- IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
- ldr.autoFlushFrequency(3000);
- ldr.allowOverwrite(true);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- assertFalse(latch.await(1000, MILLISECONDS));
-
- assertTrue(c.localSize() == 0);
-
- assertTrue(latch.await(3000, MILLISECONDS));
-
- assertEquals(9, c.size());
-
- ldr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testUpdateStore() throws Exception {
- storeMap = new ConcurrentHashMap<>();
-
- try {
- store = new TestStore();
-
- useCache = true;
-
- Ignite ignite = startGrid(1);
-
- startGrid(2);
- startGrid(3);
-
- for (int i = 0; i < 1000; i++)
- storeMap.put(i, i);
-
- try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
- ldr.allowOverwrite(true);
-
- assertFalse(ldr.skipStore());
-
- for (int i = 0; i < 1000; i++)
- ldr.removeData(i);
-
- for (int i = 1000; i < 2000; i++)
- ldr.addData(i, i);
- }
-
- for (int i = 0; i < 1000; i++)
- assertNull(storeMap.get(i));
-
- for (int i = 1000; i < 2000; i++)
- assertEquals(i, storeMap.get(i));
-
- try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
- ldr.allowOverwrite(true);
-
- ldr.skipStore(true);
-
- for (int i = 0; i < 1000; i++)
- ldr.addData(i, i);
-
- for (int i = 1000; i < 2000; i++)
- ldr.removeData(i);
- }
-
- IgniteCache<Object, Object> cache = ignite.jcache(null);
-
- for (int i = 0; i < 1000; i++) {
- assertNull(storeMap.get(i));
-
- assertEquals(i, cache.get(i));
- }
-
- for (int i = 1000; i < 2000; i++) {
- assertEquals(i, storeMap.get(i));
-
- assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
- }
- }
- finally {
- storeMap = null;
- }
- }
-
- /**
- *
- */
- private static class TestObject {
- /** Value. */
- private final int val;
-
- /**
- * @param val Value.
- */
- private TestObject(int val) {
- this.val = val;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestObject obj = (TestObject)o;
-
- return val == obj.val;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return val;
- }
- }
-
- /**
- *
- */
- private static class TestStore extends CacheStoreAdapter<Object, Object> {
- /** {@inheritDoc} */
- @Nullable @Override public Object load(Object key) {
- return storeMap.get(key);
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<?, ?> entry) {
- storeMap.put(entry.getKey(), entry.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) {
- storeMap.remove(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
new file mode 100644
index 0000000..63b2c248
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code IgniteDataStreamerImpl}.
+ */
+public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of keys to load via data streamer. */
+ private static final int KEYS_COUNT = 1000;
+
+ /** Started grid counter. */
+ private static int cnt;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ // Forth node goes without cache.
+ if (cnt < 4)
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cnt++;
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+ try {
+ startGrids(5);
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.awaitQuiet(barrier);
+
+ G.stopAll(true);
+
+ return null;
+ }
+ }, 1);
+
+ Ignite g4 = grid(4);
+
+ IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+
+ dataLdr.perNodeBufferSize(32);
+
+ for (int i = 0; i < 100000; i += 2) {
+ dataLdr.addData(i, i);
+ dataLdr.removeData(i + 1);
+ }
+
+ U.awaitQuiet(barrier);
+
+ info("Closing data streamer.");
+
+ try {
+ dataLdr.close(true);
+ }
+ catch (IllegalStateException ignore) {
+ // This is ok to ignore this exception as test is racy by it's nature -
+ // grid is stopping in different thread.
+ }
+ }
+ finally {
+ G.stopAll(true);
+ }
+ }
+
+ /**
+ * Data streamer should correctly load entries from HashMap in case of grids with more than one node
+ * and with GridOptimizedMarshaller that requires serializable.
+ *
+ * @throws Exception If failed.
+ */
+ public void testAddDataFromMap() throws Exception {
+ try {
+ cnt = 0;
+
+ startGrids(2);
+
+ Ignite g0 = grid(0);
+
+ Marshaller marsh = g0.configuration().getMarshaller();
+
+ if (marsh instanceof OptimizedMarshaller)
+ assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
+ else
+ fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
+
+ IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+
+ Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+ for (int i = 0; i < KEYS_COUNT; i ++)
+ map.put(i, String.valueOf(i));
+
+ dataLdr.addData(map);
+
+ dataLdr.close();
+
+ Random rnd = new Random();
+
+ IgniteCache<Integer, String> c = g0.jcache(null);
+
+ for (int i = 0; i < KEYS_COUNT; i ++) {
+ Integer k = rnd.nextInt(KEYS_COUNT);
+
+ String v = c.get(k);
+
+ assertEquals(k.toString(), v);
+ }
+ }
+ finally {
+ G.stopAll(true);
+ }
+ }
+
+ /**
+ * Gets cache configuration.
+ *
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setBackups(1);
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return cacheCfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestObject implements Serializable {
+ /** */
+ private int val;
+
+ /**
+ */
+ private TestObject() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ public Integer val() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof TestObject && ((TestObject)obj).val == val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
new file mode 100644
index 0000000..33fe310
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Data streamer performance test. Compares group lock data streamer to traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int GRID_CNT = 3;
+
+ /** */
+ private static final int ENTRY_CNT = 80000;
+
+ /** */
+ private boolean useCache;
+
+ /** */
+ private String[] vals = new String[2048];
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setIncludeProperties();
+
+ cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+ cfg.setConnectorConfiguration(null);
+
+ cfg.setPeerClassLoadingEnabled(true);
+
+ if (useCache) {
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(PARTITIONED);
+
+ cc.setDistributionMode(PARTITIONED_ONLY);
+ cc.setWriteSynchronizationMode(FULL_SYNC);
+ cc.setStartSize(ENTRY_CNT / GRID_CNT);
+ cc.setSwapEnabled(false);
+
+ cc.setBackups(1);
+
+ cc.setStoreValueBytes(true);
+
+ cfg.setCacheSanityCheckEnabled(false);
+ cfg.setCacheConfiguration(cc);
+ }
+ else
+ cfg.setCacheConfiguration();
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ for (int i = 0; i < vals.length; i++) {
+ int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+ StringBuilder sb = new StringBuilder();
+
+ for (int j = 0; j < valLen; j++)
+ sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+ vals[i] = sb.toString();
+
+ info("Value: " + vals[i]);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPerformance() throws Exception {
+ doTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest() throws Exception {
+ System.gc();
+ System.gc();
+ System.gc();
+
+ try {
+ useCache = true;
+
+ startGridsMultiThreaded(GRID_CNT);
+
+ useCache = false;
+
+ Ignite ignite = startGrid();
+
+ final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
+
+ ldr.perNodeBufferSize(8192);
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
+ ldr.autoFlushFrequency(0);
+
+ final LongAdder cnt = new LongAdder();
+
+ long start = U.currentTimeMillis();
+
+ Thread t = new Thread(new Runnable() {
+ @SuppressWarnings("BusyWait")
+ @Override public void run() {
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException ignored) {
+ break;
+ }
+
+ info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+ }
+ }
+ });
+
+ t.setDaemon(true);
+
+ t.start();
+
+ int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+ multithreaded(new Callable<Object>() {
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public Object call() throws Exception {
+ ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+ while (true) {
+ int i = rnd.nextInt(ENTRY_CNT);
+
+ ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+ cnt.increment();
+ }
+ }
+ }, threadNum, "loader");
+
+ info("Closing loader...");
+
+ ldr.close(false);
+
+ long duration = U.currentTimeMillis() - start;
+
+ info("Finished performance test. Duration: " + duration + "ms.");
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
new file mode 100644
index 0000000..9402c0c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static ConcurrentHashMap<Object, Object> storeMap;
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private CacheMode mode = PARTITIONED;
+
+ /** */
+ private boolean nearEnabled = true;
+
+ /** */
+ private boolean useCache;
+
+ /** */
+ private TestStore store;
+
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
+ super.afterTest();
+
+ useCache = false;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setIncludeProperties();
+
+ cfg.setMarshaller(new OptimizedMarshaller(false));
+
+ if (useCache) {
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(mode);
+ cc.setAtomicityMode(TRANSACTIONAL);
+ cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+ cc.setWriteSynchronizationMode(FULL_SYNC);
+
+ cc.setEvictSynchronized(false);
+ cc.setEvictNearSynchronized(false);
+
+ if (store != null) {
+ cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+ cc.setReadThrough(true);
+ cc.setWriteThrough(true);
+ }
+
+ cfg.setCacheConfiguration(cc);
+ }
+ else
+ cfg.setCacheConfiguration();
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitioned() throws Exception {
+ mode = PARTITIONED;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testColocated() throws Exception {
+ mode = PARTITIONED;
+ nearEnabled = false;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ mode = REPLICATED;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ mode = LOCAL;
+
+ try {
+ checkDataStreamer();
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ // Cannot load local cache configured remotely.
+ info("Caught expected exception: " + e);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ErrorNotRethrown")
+ private void checkDataStreamer() throws Exception {
+ try {
+ Ignite g1 = startGrid(1);
+
+ useCache = true;
+
+ Ignite g2 = startGrid(2);
+ startGrid(3);
+
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+ final AtomicInteger idxGen = new AtomicInteger();
+ final int cnt = 400;
+ final int threads = 10;
+
+ final CountDownLatch l1 = new CountDownLatch(threads);
+
+ IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ int idx = idxGen.getAndIncrement();
+
+ futs.add(ldr.addData(idx, idx));
+ }
+
+ l1.countDown();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, threads);
+
+ l1.await();
+
+ // This will wait until data streamer finishes loading.
+ stopGrid(getTestGridName(1), false);
+
+ f1.get();
+
+ int s2 = internalCache(2).primaryKeySet().size();
+ int s3 = internalCache(3).primaryKeySet().size();
+ int total = threads * cnt;
+
+ assertEquals(total, s2 + s3);
+
+ final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
+
+ rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+ final CountDownLatch l2 = new CountDownLatch(threads);
+
+ IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ final int key = idxGen.decrementAndGet();
+
+ futs.add(rmvLdr.removeData(key));
+ }
+
+ l2.countDown();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, threads);
+
+ l2.await();
+
+ rmvLdr.close(false);
+
+ f2.get();
+
+ s2 = internalCache(2).primaryKeySet().size();
+ s3 = internalCache(3).primaryKeySet().size();
+
+ assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedIsolated() throws Exception {
+ mode = PARTITIONED;
+
+ checkIsolatedDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedIsolated() throws Exception {
+ mode = REPLICATED;
+
+ checkIsolatedDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkIsolatedDataStreamer() throws Exception {
+ try {
+ useCache = true;
+
+ Ignite g1 = startGrid(0);
+ startGrid(1);
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, -1);
+
+ final int cnt = 40_000;
+ final int threads = 10;
+
+ try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
+ final AtomicInteger idxGen = new AtomicInteger();
+
+ IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ int idx = idxGen.getAndIncrement();
+
+ ldr.addData(idx, idx);
+ }
+
+ return null;
+ }
+ }, threads);
+
+ f1.get();
+ }
+
+ for (int g = 0; g < 3; g++) {
+ ClusterNode locNode = grid(g).localNode();
+
+ GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
+
+ if (cache0.isNear())
+ cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
+
+ CacheAffinity<Integer> aff = cache0.affinity();
+
+ for (int key = 0; key < cnt * threads; key++) {
+ if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
+ GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
+
+ assertNotNull("Missing entry for key: " + key, entry);
+ assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
+ }
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Test primitive arrays can be passed into data streamer.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPrimitiveArrays() throws Exception {
+ try {
+ useCache = true;
+ mode = PARTITIONED;
+
+ Ignite g1 = startGrid(1);
+ startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
+
+ List<Object> arrays = Arrays.<Object>asList(
+ new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
+ new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
+
+ IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
+
+ for (int i = 0, size = arrays.size(); i < 1000; i++) {
+ Object arr = arrays.get(i % size);
+
+ dataLdr.addData(i, arr);
+ dataLdr.addData(i, fixedClosure(arr));
+ }
+
+ dataLdr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedMultiThreaded() throws Exception {
+ mode = REPLICATED;
+
+ checkLoaderMultithreaded(1, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedMultiThreaded() throws Exception {
+ mode = PARTITIONED;
+
+ checkLoaderMultithreaded(1, 3);
+ }
+
+ /**
+ * Tests loader in multithreaded environment with various count of grids started.
+ *
+ * @param nodesCntNoCache How many nodes should be started without cache.
+ * @param nodesCntCache How many nodes should be started with cache.
+ * @throws Exception If failed.
+ */
+ protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
+ throws Exception {
+ try {
+ // Start all required nodes.
+ int idx = 1;
+
+ for (int i = 0; i < nodesCntNoCache; i++)
+ startGrid(idx++);
+
+ useCache = true;
+
+ for (int i = 0; i < nodesCntCache; i++)
+ startGrid(idx++);
+
+ Ignite g1 = grid(1);
+
+ // Get and configure loader.
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
+ ldr.perNodeBufferSize(2);
+
+ // Define count of puts.
+ final AtomicInteger idxGen = new AtomicInteger();
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ final int totalPutCnt = 50000;
+
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+ while (!done.get()) {
+ int idx = idxGen.getAndIncrement();
+
+ if (idx >= totalPutCnt) {
+ info(">>> Stopping producer thread since maximum count of puts reached.");
+
+ break;
+ }
+
+ futs.add(ldr.addData(idx, idx));
+ }
+
+ ldr.flush();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, 5, "producer");
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!done.get()) {
+ ldr.flush();
+
+ U.sleep(100);
+ }
+
+ return null;
+ }
+ }, 1, "flusher");
+
+ // Define index of node being restarted.
+ final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
+
+ IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ for (int i = 0; i < 5; i++) {
+ Ignite g = startGrid(restartNodeIdx);
+
+ UUID id = g.cluster().localNode().id();
+
+ info(">>>>>>> Started node: " + id);
+
+ U.sleep(1000);
+
+ stopGrid(getTestGridName(restartNodeIdx), true);
+
+ info(">>>>>>> Stopped node: " + id);
+ }
+ }
+ finally {
+ done.set(true);
+
+ info("Start stop thread finished.");
+ }
+
+ return null;
+ }
+ }, 1, "start-stop-thread");
+
+ fut1.get();
+ fut2.get();
+ fut3.get();
+ }
+ finally {
+ ldr.close(false);
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoaderApi() throws Exception {
+ useCache = true;
+
+ try {
+ Ignite g1 = startGrid(1);
+
+ IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
+
+ ldr.close(false);
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+
+ try {
+ // Create another loader.
+ ldr = g1.dataStreamer("UNKNOWN_CACHE");
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ ldr.close(true);
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+
+ // Create another loader.
+ ldr = g1.dataStreamer(null);
+
+ // Cancel with future.
+ ldr.future().cancel();
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ try {
+ ldr.future().get();
+
+ assert false;
+ }
+ catch (IgniteFutureCancelledException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ // Create another loader.
+ ldr = g1.dataStreamer(null);
+
+ // This will close loader.
+ stopGrid(getTestGridName(1), false);
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Wraps integer to closure returning it.
+ *
+ * @param i Value to wrap.
+ * @return Callable.
+ */
+ private static Callable<Integer> callable(@Nullable final Integer i) {
+ return new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return i;
+ }
+ };
+ }
+
+ /**
+ * Wraps integer to closure returning it.
+ *
+ * @param i Value to wrap.
+ * @return Closure.
+ */
+ private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
+ return new IgniteClosure<Integer, Integer>() {
+ @Override public Integer apply(Integer e) {
+ return e == null ? i : e + i;
+ }
+ };
+ }
+
+ /**
+ * Wraps object to closure returning it.
+ *
+ * @param obj Value to wrap.
+ * @return Closure.
+ */
+ private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
+ return new IgniteClosure<T, T>() {
+ @Override public T apply(T e) {
+ assert e == null || obj == null || e.getClass() == obj.getClass() :
+ "Expects the same types [e=" + e + ", obj=" + obj + ']';
+
+ return obj;
+ }
+ };
+ }
+
+ /**
+ * Wraps integer to closure expecting it and returning {@code null}.
+ *
+ * @param exp Expected closure value.
+ * @return Remove expected cache value closure.
+ */
+ private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
+ return new IgniteClosure<T, T>() {
+ @Override public T apply(T act) {
+ if (exp == null ? act == null : exp.equals(act))
+ return null;
+
+ throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
+ }
+ };
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFlush() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ multithreaded(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ ldr.flush();
+
+ assertEquals(9, c.size());
+
+ return null;
+ }
+ }, 5, "flush-checker");
+
+ ldr.addData(100, 100);
+
+ ldr.flush();
+
+ assertEquals(10, c.size());
+
+ ldr.addData(200, 200);
+
+ ldr.close(false);
+
+ ldr.future().get();
+
+ assertEquals(11, c.size());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTryFlush() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ ldr.tryFlush();
+
+ Thread.sleep(100);
+
+ assertEquals(9, c.size());
+
+ ldr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFlushTimeout() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ final CountDownLatch latch = new CountDownLatch(9);
+
+ g.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ assertTrue(c.localSize() == 0);
+
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+ ldr.autoFlushFrequency(3000);
+ ldr.allowOverwrite(true);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ assertFalse(latch.await(1000, MILLISECONDS));
+
+ assertTrue(c.localSize() == 0);
+
+ assertTrue(latch.await(3000, MILLISECONDS));
+
+ assertEquals(9, c.size());
+
+ ldr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateStore() throws Exception {
+ storeMap = new ConcurrentHashMap<>();
+
+ try {
+ store = new TestStore();
+
+ useCache = true;
+
+ Ignite ignite = startGrid(1);
+
+ startGrid(2);
+ startGrid(3);
+
+ for (int i = 0; i < 1000; i++)
+ storeMap.put(i, i);
+
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+ ldr.allowOverwrite(true);
+
+ assertFalse(ldr.skipStore());
+
+ for (int i = 0; i < 1000; i++)
+ ldr.removeData(i);
+
+ for (int i = 1000; i < 2000; i++)
+ ldr.addData(i, i);
+ }
+
+ for (int i = 0; i < 1000; i++)
+ assertNull(storeMap.get(i));
+
+ for (int i = 1000; i < 2000; i++)
+ assertEquals(i, storeMap.get(i));
+
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+ ldr.allowOverwrite(true);
+
+ ldr.skipStore(true);
+
+ for (int i = 0; i < 1000; i++)
+ ldr.addData(i, i);
+
+ for (int i = 1000; i < 2000; i++)
+ ldr.removeData(i);
+ }
+
+ IgniteCache<Object, Object> cache = ignite.jcache(null);
+
+ for (int i = 0; i < 1000; i++) {
+ assertNull(storeMap.get(i));
+
+ assertEquals(i, cache.get(i));
+ }
+
+ for (int i = 1000; i < 2000; i++) {
+ assertEquals(i, storeMap.get(i));
+
+ assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+ }
+ }
+ finally {
+ storeMap = null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestObject {
+ /** Value. */
+ private final int val;
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestObject obj = (TestObject)o;
+
+ return val == obj.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Object load(Object key) {
+ return storeMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) {
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ storeMap.remove(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 30285a8..aea55df 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.preloa
import org.apache.ignite.internal.processors.cache.expiry.*;
import org.apache.ignite.internal.processors.cache.integration.*;
import org.apache.ignite.internal.processors.cache.local.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
/**
* Test suite.