You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/24 04:41:00 UTC
[2/4] ignite git commit: Merge branches 'ignite-843' and 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
new file mode 100644
index 0000000..955a792
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests cache in-place modification logic with iterative value increment.
+ */
+public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of nodes to test on. */
+ private static final int GRID_CNT = 2;
+
+ /** Number of increment iterations. */
+ private static final int NUM_SETS = 50;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration cache = new CacheConfiguration();
+
+ cache.setCacheMode(PARTITIONED);
+ cache.setAtomicityMode(atomicityMode());
+ cache.setWriteSynchronizationMode(FULL_SYNC);
+ cache.setBackups(1);
+ cache.setRebalanceMode(SYNC);
+
+ cfg.setCacheConfiguration(cache);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+ commSpi.setSharedMemoryPort(-1);
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @return Atomicity mode.
+ */
+ protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleEntryProcessorNodeJoin() throws Exception {
+ checkEntryProcessorNodeJoin(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAllEntryProcessorNodeJoin() throws Exception {
+ checkEntryProcessorNodeJoin(true);
+ }
+
+ /**
+ * @param invokeAll If {@code true} tests invokeAll operation.
+ * @throws Exception If failed.
+ */
+ private void checkEntryProcessorNodeJoin(boolean invokeAll) throws Exception {
+ final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final int started = 6;
+
+ try {
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ startGrid(GRID_CNT + i);
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
+ }
+ }
+ }, 1, "starter");
+
+ try {
+ checkIncrement(invokeAll);
+ }
+ finally {
+ stop.set(true);
+
+ fut.get(getTestTimeout());
+ }
+
+ for (int i = 0; i < NUM_SETS; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+
+ assertNotNull(vals);
+ assertEquals(100, vals.size());
+ }
+ }
+ }
+ finally {
+ for (int i = 0; i < started; i++)
+ stopGrid(GRID_CNT + i);
+ }
+ }
+
+ /**
+ * @param invokeAll If {@code true} tests invokeAll operation.
+ * @throws Exception If failed.
+ */
+ private void checkIncrement(boolean invokeAll) throws Exception {
+ for (int k = 0; k < 100; k++) {
+ if (invokeAll) {
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+ Map<String, Processor> procs = new LinkedHashMap<>();
+
+ for (int i = 0; i < NUM_SETS; i++) {
+ String key = "set-" + i;
+
+ String val = "value-" + k;
+
+ cache.invoke(key, new Processor(val));
+ }
+
+ cache.invokeAll(procs);
+ }
+ else {
+ for (int i = 0; i < NUM_SETS; i++) {
+ String key = "set-" + i;
+
+ String val = "value-" + k;
+
+ IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
+ cache.invoke(key, new Processor(val));
+ }
+ }
+ }
+ }
+
+ /** */
+ private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+ /** */
+ private String val;
+
+ /**
+ * @param val Value.
+ */
+ private Processor(String val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+ Set<String> vals = e.getValue();
+
+ if (vals == null)
+ vals = new HashSet<>();
+
+ vals.add(val);
+
+ e.setValue(vals);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
new file mode 100644
index 0000000..7b69674
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest {
+ /** Number of initial grids. */
+ public static final int GRID_CNT = 4;
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** TX commit latch. */
+ private CountDownLatch releaseLatch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(
+ cacheCfg("tx", TRANSACTIONAL, false),
+ cacheCfg("atomic", ATOMIC, false),
+ cacheCfg("tx_near", TRANSACTIONAL, true),
+ cacheCfg("atomic_near", ATOMIC, true));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @param cacheMode Cache mode.
+ * @param near Near enabled flag.
+ * @return Cache configuration.
+ */
+ @SuppressWarnings("unchecked")
+ private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) {
+ CacheConfiguration cfg = new CacheConfiguration(name);
+
+ cfg.setAtomicityMode(cacheMode);
+ cfg.setBackups(1);
+
+ if (near)
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+ else
+ cfg.setNearConfiguration(null);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTopologySafeNodeJoin() throws Exception {
+ checkGetTopologySafeNodeJoin(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception {
+ checkGetTopologySafeNodeJoin(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception {
+ startGrids(GRID_CNT);
+
+ awaitPartitionMapExchange();
+
+ try {
+ ClusterNode targetNode = ignite(1).cluster().localNode();
+
+ info(">>> Target node: " + targetNode.id());
+
+ // Populate caches with a key that does not belong to ignite(0).
+ int key = -1;
+ for (int i = 0; i < 100; i++) {
+ Collection<ClusterNode> nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i);
+ ClusterNode primaryNode = F.first(nodes);
+
+ if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) {
+ ignite(1).cache("tx").put(i, i);
+ ignite(1).cache("atomic").put(i, i);
+ ignite(1).cache("tx_near").put(i, i);
+ ignite(1).cache("atomic_near").put(i, i);
+
+ key = i;
+
+
+ break;
+ }
+ }
+
+ assertTrue(key != -1);
+
+ IgniteInternalFuture<?> txFut = startBlockingTxAsync();
+
+ IgniteInternalFuture<?> nodeFut = startNodeAsync();
+
+ if (failPrimary)
+ stopGrid(1);
+
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key));
+
+ releaseTx();
+
+ txFut.get();
+ nodeFut.get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @return Future.
+ * @throws Exception If failed.
+ */
+ private IgniteInternalFuture<?> startNodeAsync() throws Exception {
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ startGrid(GRID_CNT);
+
+ return null;
+ }
+ });
+
+ U.sleep(1000);
+
+ return fut;
+ }
+
+ /**
+ * @return TX release future.
+ * @throws Exception If failed.
+ */
+ private IgniteInternalFuture<?> startBlockingTxAsync() throws Exception {
+ final CountDownLatch lockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = 0; i < 30; i++)
+ ignite(0).cache("tx").get("value-" + i);
+
+ releaseLatch = new CountDownLatch(1);
+
+ lockLatch.countDown();
+
+ releaseLatch.await();
+ }
+
+ return null;
+ }
+ });
+
+ lockLatch.await();
+
+ return fut;
+ }
+
+ /**
+ *
+ */
+ private void releaseTx() {
+ assert releaseLatch != null;
+
+ releaseLatch.countDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
index 8f107e4..013dd74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMultiNodeLockAbstractTest.java
@@ -41,6 +41,12 @@ import static org.apache.ignite.events.EventType.*;
* Test cases for multi-threaded tests.
*/
public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE1 = null;
+
+ /** */
+ private static final String CACHE2 = "cache2";
+
/** Grid 1. */
private static Ignite ignite1;
@@ -70,12 +76,20 @@ public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstr
cfg.setDiscoverySpi(disco);
- cfg.setCacheConfiguration(defaultCacheConfiguration());
+ CacheConfiguration ccfg1 = cacheConfiguration().setName(CACHE1);
+ CacheConfiguration ccfg2 = cacheConfiguration().setName(CACHE2);
+
+ cfg.setCacheConfiguration(ccfg1, ccfg2);
return cfg;
}
/**
+ * @return Cache configuration.
+ */
+ protected abstract CacheConfiguration cacheConfiguration();
+
+ /**
* @return {@code True} for partitioned caches.
*/
protected boolean partitioned() {
@@ -529,6 +543,31 @@ public abstract class GridCacheMultiNodeLockAbstractTest extends GridCommonAbstr
}
/**
+ * @throws Exception If failed.
+ */
+ public void testTwoCaches() throws Exception {
+ IgniteCache<Integer, String> cache1 = ignite1.cache(CACHE1);
+ IgniteCache<Integer, String> cache2 = ignite1.cache(CACHE2);
+
+ final Integer key = primaryKey(cache1);
+
+ Lock lock = cache1.lock(key);
+
+ lock.lock();
+
+ try {
+ assertTrue(cache1.isLocalLocked(key, true));
+ assertTrue(cache1.isLocalLocked(key, false));
+
+ assertFalse(cache2.isLocalLocked(key, true));
+ assertFalse(cache2.isLocalLocked(key, false));
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
* Cache unlock listener.
*/
private class UnlockListener implements IgnitePredicate<Event> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
index 459e015..d05764c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java
@@ -165,6 +165,8 @@ public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
startGrids(GRID_CNT);
+ awaitPartitionMapExchange();
+
ignites = new Ignite[GRID_CNT];
ids = new UUID[GRID_CNT];
caches = new IgniteCache[GRID_CNT];
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java
new file mode 100644
index 0000000..5432e76
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCrossCacheTxFailoverTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheCrossCacheTxFailoverTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String CACHE1 = "cache1";
+
+ /** */
+ private static final String CACHE2 = "cache2";
+
+ /** */
+ private static final int GRID_CNT = 4;
+
+ /** */
+ private static final int KEY_RANGE = 1000;
+
+ /** */
+ private static final long TEST_TIME = 3 * 60_000;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ if (gridName.equals(getTestGridName(GRID_CNT - 1)))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @param name Cache name.
+ * @param cacheMode Cache mode.
+ * @param parts Number of partitions.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, CacheMode cacheMode, int parts) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(1);
+
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, parts));
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIME + 60_000;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCachePessimisticTxFailover() throws Exception {
+ crossCacheTxFailover(PARTITIONED, true, PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCachePessimisticTxFailoverDifferentAffinity() throws Exception {
+ crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheOptimisticTxFailover() throws Exception {
+ crossCacheTxFailover(PARTITIONED, true, OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheOptimisticTxFailoverDifferentAffinity() throws Exception {
+ crossCacheTxFailover(PARTITIONED, false, OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCachePessimisticTxFailoverReplicated() throws Exception {
+ crossCacheTxFailover(REPLICATED, true, PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheOptimisticTxFailoverReplicated() throws Exception {
+ crossCacheTxFailover(REPLICATED, true, OPTIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCrossCachePessimisticTxFailoverDifferentAffinityReplicated() throws Exception {
+ crossCacheTxFailover(PARTITIONED, false, PESSIMISTIC, REPEATABLE_READ);
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param sameAff If {@code false} uses different number of partitions for caches.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @throws Exception If failed.
+ */
+ private void crossCacheTxFailover(CacheMode cacheMode,
+ boolean sameAff,
+ final TransactionConcurrency concurrency,
+ final TransactionIsolation isolation) throws Exception {
+ IgniteKernal ignite0 = (IgniteKernal)ignite(0);
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ ignite0.createCache(cacheConfiguration(CACHE1, cacheMode, 256));
+ ignite0.createCache(cacheConfiguration(CACHE2, cacheMode, sameAff ? 256 : 128));
+
+ final AtomicInteger threadIdx = new AtomicInteger();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = threadIdx.getAndIncrement();
+
+ Ignite ignite = ignite(idx % GRID_CNT);
+
+ log.info("Started update thread [node=" + ignite.name() +
+ ", client=" + ignite.configuration().isClientMode() + ']');
+
+ IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1);
+ IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2);
+
+ assertNotSame(cache1, cache2);
+
+ IgniteTransactions txs = ignite.transactions();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ long iter = 0;
+
+ while (!stop.get()) {
+ boolean sameKey = rnd.nextBoolean();
+
+ try {
+ try (Transaction tx = txs.txStart(concurrency, isolation)) {
+ if (sameKey) {
+ TestKey key = new TestKey(rnd.nextLong(KEY_RANGE));
+
+ cacheOperation(rnd, cache1, key);
+ cacheOperation(rnd, cache2, key);
+ }
+ else {
+ TestKey key1 = new TestKey(rnd.nextLong(KEY_RANGE));
+ TestKey key2 = new TestKey(key1.key() + 1);
+
+ cacheOperation(rnd, cache1, key1);
+ cacheOperation(rnd, cache2, key2);
+ }
+
+ tx.commit();
+ }
+ }
+ catch (CacheException | IgniteException e) {
+ log.info("Update error: " + e);
+ }
+
+ if (iter++ % 500 == 0)
+ log.info("Iteration: " + iter);
+ }
+
+ return null;
+ }
+
+ /**
+ * @param rnd Random.
+ * @param cache Cache.
+ * @param key Key.
+ */
+ private void cacheOperation(ThreadLocalRandom rnd, IgniteCache<TestKey, TestValue> cache, TestKey key) {
+ switch (rnd.nextInt(4)) {
+ case 0:
+ cache.put(key, new TestValue(rnd.nextLong()));
+
+ break;
+
+ case 1:
+ cache.remove(key);
+
+ break;
+
+ case 2:
+ cache.invoke(key, new TestEntryProcessor(rnd.nextBoolean() ? 1L : null));
+
+ break;
+
+ case 3:
+ cache.get(key);
+
+ break;
+
+ default:
+ assert false;
+ }
+ }
+ }, 10, "tx-thread");
+
+ long stopTime = System.currentTimeMillis() + 3 * 60_000;
+
+ long topVer = ignite0.cluster().topologyVersion();
+
+ boolean failed = false;
+
+ while (System.currentTimeMillis() < stopTime) {
+ log.info("Start node.");
+
+ IgniteKernal ignite = (IgniteKernal)startGrid(GRID_CNT);
+
+ assertFalse(ignite.configuration().isClientMode());
+
+ topVer++;
+
+ IgniteInternalFuture<?> affFut = ignite.context().cache().context().exchange().affinityReadyFuture(
+ new AffinityTopologyVersion(topVer));
+
+ try {
+ if (affFut != null)
+ affFut.get(30_000);
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ log.error("Failed to wait for affinity future after start: " + topVer);
+
+ failed = true;
+
+ break;
+ }
+
+ Thread.sleep(500);
+
+ log.info("Stop node.");
+
+ stopGrid(GRID_CNT);
+
+ topVer++;
+
+ affFut = ignite0.context().cache().context().exchange().affinityReadyFuture(
+ new AffinityTopologyVersion(topVer));
+
+ try {
+ if (affFut != null)
+ affFut.get(30_000);
+ }
+ catch (IgniteFutureTimeoutCheckedException e) {
+ log.error("Failed to wait for affinity future after stop: " + topVer);
+
+ failed = true;
+
+ break;
+ }
+ }
+
+ stop.set(true);
+
+ fut.get();
+
+ assertFalse("Test failed, see log for details.", failed);
+ }
+ finally {
+ stop.set(true);
+
+ ignite0.destroyCache(CACHE1);
+ ignite0.destroyCache(CACHE2);
+
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestKey implements Serializable {
+ /** */
+ private long key;
+
+ /**
+ * @param key Key.
+ */
+ public TestKey(long key) {
+ this.key = key;
+ }
+
+ /**
+ * @return Key.
+ */
+ public long key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey testKey = (TestKey)o;
+
+ return key == testKey.key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(key ^ (key >>> 32));
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(long val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public long value() {
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> {
+ /** */
+ private Long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestEntryProcessor(@Nullable Long val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) {
+ TestValue old = e.getValue();
+
+ if (val != null)
+ e.setValue(new TestValue(val));
+
+ return old;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index c798369..5d0cacc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -54,6 +54,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
cfg.setAtomicWriteOrderMode(writeOrderMode());
cfg.setBackups(1);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 1d14dec..0ab5729 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -19,17 +19,29 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
/**
*
*/
public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+ /** */
+ private static final int FACTOR = 1000;
+
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.TRANSACTIONAL;
@@ -76,4 +88,179 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
finished.set(true);
}
}
+
+ /** {@inheritDoc} */
+ public void testExplicitTransactionRetries() throws Exception {
+ final AtomicInteger idx = new AtomicInteger();
+ int threads = 8;
+
+ final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ int th = idx.getAndIncrement();
+ int base = th * FACTOR;
+
+ Ignite ignite = ignite(0);
+ final IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ try {
+ for (int i = 0; i < FACTOR; i++) {
+ doInTransaction(ignite, new ProcessCallable(cache, base, i));
+
+ if (i > 0 && i % 500 == 0)
+ info("Done: " + i);
+ }
+ }
+ catch (Exception e) {
+ err.set(th, e);
+ }
+
+ return null;
+ }
+ }, threads, "tx-runner");
+
+ while (!fut.isDone()) {
+ int stopIdx = ThreadLocalRandom.current().nextInt(2, 4); // Random in [2, 3].
+
+ stopGrid(stopIdx);
+
+ U.sleep(500);
+
+ startGrid(stopIdx);
+ }
+
+ for (int i = 0; i < threads; i++) {
+ Exception error = err.get(i);
+
+ if (error != null)
+ throw error;
+ }
+
+ // Verify contents of the cache.
+ for (int g = 0; g < gridCount(); g++) {
+ IgniteCache<Object, Object> cache = ignite(g).cache(null);
+
+ for (int th = 0; th < threads; th++) {
+ int base = th * FACTOR;
+
+ String key = "key-" + base;
+
+ Set<String> set = (Set<String>)cache.get(key);
+
+ assertNotNull("Missing set for key: " + key, set);
+ assertEquals(FACTOR, set.size());
+
+ for (int i = 0; i < FACTOR; i++) {
+ assertEquals("value-" + i, cache.get("key-" + base + "-" + i));
+ assertTrue(set.contains("value-" + i));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception If failed.
+ */
+ private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ IgniteFuture<?> fut = e.retryReadyFuture();
+
+ fut.get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /**
+ * Callable to process inside transaction.
+ */
+ private static class ProcessCallable implements Callable<Void> {
+ /** */
+ private IgniteCache cache;
+
+ /** */
+ private int base;
+
+ /** */
+ private int i;
+
+ /**
+ * @param cache Cache.
+ * @param base Base index.
+ * @param i Iteration index.
+ */
+ private ProcessCallable(IgniteCache<Object, Object> cache, int base, int i) {
+ this.cache = cache;
+ this.base = base;
+ this.i = i;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Void call() throws Exception {
+ String key1 = "key-" + base + "-" + i;
+ String key2 = "key-" + base;
+
+ assert key1.compareTo(key2) > 0;
+
+ ((IgniteCache<String, String>)cache).put(key1, "value-" + i);
+
+ ((IgniteCache<String, Set<String>>)cache).invoke(key2, new AddEntryProcessor("value-" + i));
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class AddEntryProcessor implements CacheEntryProcessor<String, Set<String>, Void> {
+ /** */
+ private String addVal;
+
+ /**
+ * @param addVal Value to add.
+ */
+ private AddEntryProcessor(String addVal) {
+ this.addVal = addVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Set<String>> entry, Object... arguments) throws EntryProcessorException {
+ Set<String> set = entry.getValue();
+
+ if (set == null)
+ set = new HashSet<>();
+
+ set.add(addVal);
+
+ entry.setValue(set);
+
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
index b6bc56e..d1d7c02 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java
@@ -198,7 +198,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
}
// Test optimistic transaction.
- GridTestUtils.assertThrows(log, new Callable<Object>() {
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
@Override public Object call() throws Exception {
try (Transaction tx = igniteNearOnly.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
nearOnly.put("key", "val");
@@ -208,7 +208,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest {
return null;
}
- }, ClusterTopologyException.class, null);
+ }, ClusterTopologyCheckedException.class);
// Test pessimistic transaction.
GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
new file mode 100644
index 0000000..44ef20d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearTxForceKeyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class GridCacheNearTxForceKeyTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(ASYNC);
+ ccfg.setRebalanceDelay(5000);
+ ccfg.setBackups(0);
+ ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * Test provokes scenario when primary node sends force key request to node started transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNearTx() throws Exception {
+ Ignite ignite0 = startGrid(0);
+
+ IgniteCache<Integer, Integer> cache = ignite0.cache(null);
+
+ Ignite ignite1 = startGrid(1);
+
+ final Integer key = 2;
+
+ assertNull(cache.getAndPut(key, key));
+
+ assertTrue(ignite0.affinity(null).isPrimary(ignite1.cluster().localNode(), key));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
index 6138022..6ceded3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
@@ -32,6 +32,8 @@ import java.util.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
/**
* Test for issue GG-3997 Total Hits and Misses display wrong value for in-memory database.
@@ -50,18 +52,18 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- // DiscoverySpi
+ // DiscoverySpi.
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
cfg.setDiscoverySpi(disco);
// Cache.
- cfg.setCacheConfiguration(cacheConfiguration(gridName));
+ cfg.setCacheConfiguration(cacheConfiguration());
TransactionConfiguration tCfg = new TransactionConfiguration();
- tCfg.setDefaultTxConcurrency(TransactionConcurrency.PESSIMISTIC);
- tCfg.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ);
+ tCfg.setDefaultTxConcurrency(PESSIMISTIC);
+ tCfg.setDefaultTxIsolation(REPEATABLE_READ);
cfg.setTransactionConfiguration(tCfg);
@@ -71,20 +73,18 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
/**
* Cache configuration.
*
- * @param gridName Grid name.
* @return Cache configuration.
* @throws Exception In case of error.
*/
- protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ protected CacheConfiguration cacheConfiguration() throws Exception {
CacheConfiguration cfg = defaultCacheConfiguration();
+
cfg.setCacheMode(PARTITIONED);
cfg.setStartSize(700000);
cfg.setWriteSynchronizationMode(FULL_ASYNC);
cfg.setEvictionPolicy(null);
cfg.setBackups(1);
cfg.setNearConfiguration(null);
- cfg.setRebalanceDelay(-1);
- cfg.setBackups(1);
cfg.setStatisticsEnabled(true);
return cfg;
@@ -96,10 +96,10 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
* @throws Exception If failed.
*/
public void testHitsAndMisses() throws Exception {
- assert(GRID_CNT > 0);
-
startGrids(GRID_CNT);
+ awaitPartitionMapExchange();
+
try {
final Ignite g = grid(0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java
index a782aec..bbc56e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeLockSelfTest.java
@@ -29,9 +29,7 @@ import static org.apache.ignite.cache.CacheMode.*;
*/
public class GridCachePartitionedMultiNodeLockSelfTest extends GridCacheMultiNodeLockAbstractTest {
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration c = super.getConfiguration(gridName);
-
+ @Override protected CacheConfiguration cacheConfiguration() {
CacheConfiguration cc = defaultCacheConfiguration();
cc.setCacheMode(PARTITIONED);
@@ -39,9 +37,7 @@ public class GridCachePartitionedMultiNodeLockSelfTest extends GridCacheMultiNod
cc.setAtomicityMode(TRANSACTIONAL);
cc.setNearConfiguration(new NearCacheConfiguration());
- c.setCacheConfiguration(cc);
-
- return c;
+ return cc;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java
index 70e0ad6..bf3620b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMultiNodeLockSelfTest.java
@@ -32,15 +32,11 @@ public class GridCacheReplicatedMultiNodeLockSelfTest extends GridCacheMultiNode
}
/** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration() throws Exception {
- IgniteConfiguration cfg = super.getConfiguration();
-
+ @Override protected CacheConfiguration cacheConfiguration() {
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(REPLICATED);
- cfg.setCacheConfiguration(cacheCfg);
-
- return cfg;
+ return cacheCfg;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java
index 218b817..7910e41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearEvictionPolicySelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.testframework.junits.common.*;
import java.util.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
@@ -49,14 +50,18 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest {
/** Cache atomicity mode specified by test. */
private CacheAtomicityMode atomicityMode;
+ /** Memory mode. */
+ private CacheMemoryMode memMode;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration c = super.getConfiguration(gridName);
CacheConfiguration cc = new CacheConfiguration();
- cc.setAtomicityMode(atomicityMode);
cc.setCacheMode(PARTITIONED);
+ cc.setAtomicityMode(atomicityMode);
+ cc.setMemoryMode(memMode);
cc.setWriteSynchronizationMode(PRIMARY_SYNC);
cc.setRebalanceMode(SYNC);
cc.setStartSize(100);
@@ -86,6 +91,17 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest {
*/
public void testAtomicNearEvictionMaxSize() throws Exception {
atomicityMode = ATOMIC;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = ATOMIC;
+ memMode = CacheMemoryMode.OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
@@ -95,6 +111,17 @@ public class LruNearEvictionPolicySelfTest extends GridCommonAbstractTest {
*/
public void testTransactionalNearEvictionMaxSize() throws Exception {
atomicityMode = TRANSACTIONAL;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTransactionalOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = TRANSACTIONAL;
+ memMode = CacheMemoryMode.OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java
index 0d3c692..6bf343b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/LruNearOnlyNearEvictionPolicySelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.testframework.junits.common.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
@@ -53,6 +54,9 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
/** Cache atomicity mode specified by test. */
private CacheAtomicityMode atomicityMode;
+ /** Memory mode. */
+ private CacheMemoryMode memMode;
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -69,8 +73,9 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
else {
CacheConfiguration cc = new CacheConfiguration();
- cc.setAtomicityMode(atomicityMode);
cc.setCacheMode(cacheMode);
+ cc.setAtomicityMode(atomicityMode);
+ cc.setMemoryMode(memMode);
cc.setWriteSynchronizationMode(PRIMARY_SYNC);
cc.setRebalanceMode(SYNC);
cc.setStartSize(100);
@@ -92,6 +97,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
public void testPartitionedAtomicNearEvictionMaxSize() throws Exception {
atomicityMode = ATOMIC;
cacheMode = PARTITIONED;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedAtomicOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = ATOMIC;
+ cacheMode = PARTITIONED;
+ memMode = OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
@@ -102,6 +119,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
public void testPartitionedTransactionalNearEvictionMaxSize() throws Exception {
atomicityMode = TRANSACTIONAL;
cacheMode = PARTITIONED;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedTransactionalOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = TRANSACTIONAL;
+ cacheMode = PARTITIONED;
+ memMode = OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
@@ -112,6 +141,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
public void testReplicatedAtomicNearEvictionMaxSize() throws Exception {
atomicityMode = ATOMIC;
cacheMode = REPLICATED;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedAtomicOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = ATOMIC;
+ cacheMode = REPLICATED;
+ memMode = OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
@@ -122,6 +163,18 @@ public class LruNearOnlyNearEvictionPolicySelfTest extends GridCommonAbstractTes
public void testReplicatedTransactionalNearEvictionMaxSize() throws Exception {
atomicityMode = TRANSACTIONAL;
cacheMode = REPLICATED;
+ memMode = ONHEAP_TIERED;
+
+ checkNearEvictionMaxSize();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTransactionalOffHeapNearEvictionMaxSize() throws Exception {
+ atomicityMode = TRANSACTIONAL;
+ cacheMode = REPLICATED;
+ memMode = OFFHEAP_TIERED;
checkNearEvictionMaxSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
index 35abf7e..65d9f36 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
@@ -33,6 +33,7 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
/**
*
@@ -65,8 +66,8 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setCacheMode(PARTITIONED);
-
ccfg.setBackups(1);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(ccfg);
}
@@ -79,16 +80,31 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
- public void testClassCacheUpdateFailover() throws Exception {
+ public void testClassCacheUpdateFailover1() throws Exception {
+ classCacheUpdateFailover(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClassCacheUpdateFailover2() throws Exception {
+ classCacheUpdateFailover(true);
+ }
+
+ /**
+ * @param stopSrv If {@code true} restarts server node, otherwise client node.
+ * @throws Exception If failed.
+ */
+ private void classCacheUpdateFailover(boolean stopSrv) throws Exception {
cache = true;
startGridsMultiThreaded(2);
- cache = false;
+ cache = stopSrv;
IgniteCache<Integer, Object> cache0 = ignite(0).cache(null);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
log.info("Iteration: " + i);
Map<Integer, Object> map = new HashMap<>();
@@ -106,7 +122,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
}
});
- cache0.putAll(map); // Do not stop cache node, so put should not fail.
+ cache0.putAll(map);
fut.get();
}
@@ -210,6 +226,26 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
case 9: return new TestClass9();
case 10: return new TestClass10();
+
+ case 11: return new TestClass11();
+
+ case 12: return new TestClass12();
+
+ case 13: return new TestClass13();
+
+ case 14: return new TestClass14();
+
+ case 15: return new TestClass15();
+
+ case 16: return new TestClass16();
+
+ case 17: return new TestClass17();
+
+ case 18: return new TestClass18();
+
+ case 19: return new TestClass19();
+
+ case 20: return new TestClass20();
}
fail();
@@ -221,6 +257,7 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
*
*/
static class TestClass1 implements Serializable {
+ /** */
int val;
}
@@ -268,4 +305,54 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
*
*/
static class TestClass10 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass11 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass12 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass13 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass14 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass15 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass16 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass17 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass18 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass19 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass20 implements Serializable {}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index af2b85c..b64471b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -80,6 +80,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
+ suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
index 97c558a..f3fac23 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
@@ -46,6 +46,8 @@ public class IgniteCacheFailoverTestSuite2 {
suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class);
suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class);
+ suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 2efdb82..4926590 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -140,6 +140,11 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class));
suite.addTest(new TestSuite(IgniteCacheServerNodeConcurrentStart.class));
+ suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class));
+ suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class));
+ suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class));
+ suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class));
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java
----------------------------------------------------------------------
diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java
index 31466b5..eb447b9 100644
--- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java
+++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/DbMetadataReader.java
@@ -46,12 +46,49 @@ public class DbMetadataReader {
}
/**
+ * Get specified dialect object for selected database.
+ *
+ * @param conn Connection to database.
+ * @return Specific dialect object.
+ */
+ private DatabaseMetadataDialect dialect(Connection conn) {
+ try {
+ String dbProductName = conn.getMetaData().getDatabaseProductName();
+
+ if ("Oracle".equals(dbProductName))
+ return new OracleMetadataDialect();
+ else if (dbProductName.startsWith("DB2/"))
+ return new DB2MetadataDialect();
+ else if (dbProductName.equals("MySQL"))
+ return new MySQLMetadataDialect();
+ else
+ return new JdbcMetadataDialect();
+ }
+ catch (SQLException e) {
+ log.log(Level.SEVERE, "Failed to resolve dialect (JdbcMetaDataDialect will be used.", e);
+
+ return new JdbcMetadataDialect();
+ }
+ }
+ /**
+ * Get list of schemas from database.
+ *
+ * @param conn Connection to database.
+ * @return List of schema names.
+ * @throws SQLException If schemas loading failed.
+ */
+ public List<String> schemas(Connection conn) throws SQLException {
+ return dialect(conn).schemas(conn);
+ }
+
+ /**
* Extract DB metadata.
*
* @param conn Connection.
+ * @param schemas List of database schemas to process. In case of empty list all schemas will be processed.
* @param tblsOnly Tables only flag.
*/
- public Collection<DbTable> extractMetadata(Connection conn, boolean tblsOnly) throws SQLException {
+ public Collection<DbTable> extractMetadata(Connection conn, List<String> schemas, boolean tblsOnly) throws SQLException {
DatabaseMetadataDialect dialect;
try {
@@ -70,7 +107,7 @@ public class DbMetadataReader {
dialect = new JdbcMetadataDialect();
}
- return dialect.tables(conn, tblsOnly);
+ return dialect.tables(conn, schemas, tblsOnly);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java
----------------------------------------------------------------------
diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java
index 17eb8b2..15063e2 100644
--- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java
+++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DB2MetadataDialect.java
@@ -25,6 +25,7 @@ import java.util.*;
public class DB2MetadataDialect extends JdbcMetadataDialect {
/** {@inheritDoc} */
@Override public Set<String> systemSchemas() {
- return new HashSet<>(Arrays.asList("SYSIBM", "SYSCAT", "SYSSTAT", "SYSTOOLS"));
+ return new HashSet<>(Arrays.asList("SYSIBM", "SYSCAT", "SYSSTAT", "SYSTOOLS", "SYSFUN", "SYSIBMADM",
+ "SYSIBMINTERNAL", "SYSIBMTS", "SYSPROC", "SYSPUBLIC"));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java
----------------------------------------------------------------------
diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java
index 0d17567..9c059b8 100644
--- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java
+++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/DatabaseMetadataDialect.java
@@ -27,14 +27,25 @@ import java.util.*;
*/
public abstract class DatabaseMetadataDialect {
/**
+ * Gets schemas from database.
+ *
+ * @param conn Database connection.
+ * @return Collection of schema descriptors.
+ * @throws SQLException If failed to get schemas.
+ */
+ public abstract List<String> schemas(Connection conn) throws SQLException;
+
+ /**
* Gets tables from database.
*
* @param conn Database connection.
+ * @param schemas Collention of schema names to load.
* @param tblsOnly If {@code true} then gets only tables otherwise gets tables and views.
* @return Collection of table descriptors.
* @throws SQLException If failed to get tables.
*/
- public abstract Collection<DbTable> tables(Connection conn, boolean tblsOnly) throws SQLException;
+ public abstract Collection<DbTable> tables(Connection conn, List<String> schemas, boolean tblsOnly)
+ throws SQLException;
/**
* @return Collection of database system schemas.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java
----------------------------------------------------------------------
diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java
index ab65e7a..1bb6840 100644
--- a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java
+++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/JdbcMetadataDialect.java
@@ -63,76 +63,117 @@ public class JdbcMetadataDialect extends DatabaseMetadataDialect {
private static final int IDX_ASC_OR_DESC_IDX = 10;
/** {@inheritDoc} */
- @Override public Collection<DbTable> tables(Connection conn, boolean tblsOnly) throws SQLException {
+ @Override public List<String> schemas(Connection conn) throws SQLException {
+ List<String> schemas = new ArrayList<>();
+
+ ResultSet rs = conn.getMetaData().getSchemas();
+
+ Set<String> sys = systemSchemas();
+
+ while(rs.next()) {
+ String schema = rs.getString(1);
+
+ // Skip system schemas.
+ if (sys.contains(schema))
+ continue;
+
+ schemas.add(schema);
+ }
+
+ return schemas;
+ }
+
+ /**
+ * @return If {@code true} use catalogs for table division.
+ */
+ protected boolean useCatalog() {
+ return false;
+ }
+
+ /**
+ * @return If {@code true} use schemas for table division.
+ */
+ protected boolean useSchema() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<DbTable> tables(Connection conn, List<String> schemas, boolean tblsOnly)
+ throws SQLException {
DatabaseMetaData dbMeta = conn.getMetaData();
Set<String> sys = systemSchemas();
Collection<DbTable> tbls = new ArrayList<>();
- try (ResultSet tblsRs = dbMeta.getTables(null, null, "%",
- tblsOnly ? TABLES_ONLY : TABLES_AND_VIEWS)) {
- while (tblsRs.next()) {
- String tblCatalog = tblsRs.getString(TBL_CATALOG_IDX);
- String tblSchema = tblsRs.getString(TBL_SCHEMA_IDX);
- String tblName = tblsRs.getString(TBL_NAME_IDX);
+ if (schemas.size() == 0)
+ schemas.add(null);
- // In case of MySql we should use catalog.
- String schema = tblSchema != null ? tblSchema : tblCatalog;
+ for (String toSchema: schemas) {
+ try (ResultSet tblsRs = dbMeta.getTables(useCatalog() ? toSchema : null, useSchema() ? toSchema : null, "%",
+ tblsOnly ? TABLES_ONLY : TABLES_AND_VIEWS)) {
+ while (tblsRs.next()) {
+ String tblCatalog = tblsRs.getString(TBL_CATALOG_IDX);
+ String tblSchema = tblsRs.getString(TBL_SCHEMA_IDX);
+ String tblName = tblsRs.getString(TBL_NAME_IDX);
- // Skip system schemas.
- if (sys.contains(schema))
- continue;
+ // In case of MySql we should use catalog.
+ String schema = tblSchema != null ? tblSchema : tblCatalog;
- Set<String> pkCols = new HashSet<>();
+ // Skip system schemas.
+ if (sys.contains(schema))
+ continue;
- try (ResultSet pkRs = dbMeta.getPrimaryKeys(tblCatalog, tblSchema, tblName)) {
- while (pkRs.next())
- pkCols.add(pkRs.getString(PK_COL_NAME_IDX));
- }
+ Set<String> pkCols = new HashSet<>();
+
+ try (ResultSet pkRs = dbMeta.getPrimaryKeys(tblCatalog, tblSchema, tblName)) {
+ while (pkRs.next())
+ pkCols.add(pkRs.getString(PK_COL_NAME_IDX));
+ }
- List<DbColumn> cols = new ArrayList<>();
+ List<DbColumn> cols = new ArrayList<>();
- try (ResultSet colsRs = dbMeta.getColumns(tblCatalog, tblSchema, tblName, null)) {
- while (colsRs.next()) {
- String colName = colsRs.getString(COL_NAME_IDX);
+ try (ResultSet colsRs = dbMeta.getColumns(tblCatalog, tblSchema, tblName, null)) {
+ while (colsRs.next()) {
+ String colName = colsRs.getString(COL_NAME_IDX);
- cols.add(new DbColumn(
- colName,
- colsRs.getInt(COL_DATA_TYPE_IDX),
- pkCols.contains(colName),
- colsRs.getInt(COL_NULLABLE_IDX) == DatabaseMetaData.columnNullable));
+ cols.add(new DbColumn(
+ colName,
+ colsRs.getInt(COL_DATA_TYPE_IDX),
+ pkCols.contains(colName),
+ colsRs.getInt(COL_NULLABLE_IDX) == DatabaseMetaData.columnNullable));
+ }
}
- }
- Map<String, Map<String, Boolean>> idxs = new LinkedHashMap<>();
+ Map<String, Map<String, Boolean>> idxs = new LinkedHashMap<>();
- try (ResultSet idxRs = dbMeta.getIndexInfo(tblCatalog, tblSchema, tblName, false, true)) {
- while (idxRs.next()) {
- String idxName = idxRs.getString(IDX_NAME_IDX);
+ try (ResultSet idxRs = dbMeta.getIndexInfo(tblCatalog, tblSchema, tblName, false, true)) {
+ while (idxRs.next()) {
+ String idxName = idxRs.getString(IDX_NAME_IDX);
- String colName = idxRs.getString(IDX_COL_NAME_IDX);
+ String colName = idxRs.getString(IDX_COL_NAME_IDX);
- if (idxName == null || colName == null)
- continue;
+ if (idxName == null || colName == null)
+ continue;
- Map<String, Boolean> idx = idxs.get(idxName);
+ Map<String, Boolean> idx = idxs.get(idxName);
- if (idx == null) {
- idx = new LinkedHashMap<>();
+ if (idx == null) {
+ idx = new LinkedHashMap<>();
- idxs.put(idxName, idx);
- }
+ idxs.put(idxName, idx);
+ }
- String askOrDesc = idxRs.getString(IDX_ASC_OR_DESC_IDX);
+ String askOrDesc = idxRs.getString(IDX_ASC_OR_DESC_IDX);
- Boolean desc = askOrDesc != null ? "D".equals(askOrDesc) : null;
+ Boolean desc = askOrDesc != null ? "D".equals(askOrDesc) : null;
- idx.put(colName, desc);
+ idx.put(colName, desc);
+ }
}
- }
- tbls.add(table(schema, tblName, cols, idxs));
+ tbls.add(table(schema, tblName, cols, idxs));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java
----------------------------------------------------------------------
diff --git a/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java
new file mode 100644
index 0000000..b592321
--- /dev/null
+++ b/modules/schema-import-db/src/main/java/org/apache/ignite/schema/parser/dialect/MySQLMetadataDialect.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.schema.parser.dialect;
+
+import java.sql.*;
+import java.util.*;
+
+/**
+ * MySQL specific metadata dialect.
+ */
+public class MySQLMetadataDialect extends JdbcMetadataDialect {
+ /** {@inheritDoc} */
+ @Override public List<String> schemas(Connection conn) throws SQLException {
+ List<String> schemas = new ArrayList<>();
+
+ ResultSet rs = conn.getMetaData().getCatalogs();
+
+ Set<String> sys = systemSchemas();
+
+ while(rs.next()) {
+ String schema = rs.getString(1);
+
+ // Skip system schemas.
+ if (sys.contains(schema))
+ continue;
+
+ schemas.add(schema);
+ }
+
+ return schemas;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean useCatalog() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean useSchema() {
+ return false;
+ }
+}