You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/22 19:02:24 UTC
[6/6] incubator-ignite git commit: # ignite-648: cache proxy;
add events proxy; delete transaction support for multiJvm;
skip some tests for multi jvm
# ignite-648: cache proxy; add events proxy; delete transaction support for multiJvm; skip some tests for multi jvm
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/470e48b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/470e48b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/470e48b5
Branch: refs/heads/ignite-648
Commit: 470e48b54debab29b7fd1fc39e306876ae379e50
Parents: 2e6fd9b
Author: ashutak <as...@gridgain.com>
Authored: Mon Jun 22 20:02:33 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Mon Jun 22 20:02:33 2015 +0300
----------------------------------------------------------------------
.../cache/GridCacheAbstractFullApiSelfTest.java | 64 +++++---
.../framework/AffinityProcessProxy.java | 5 +-
.../framework/IgniteCacheProcessProxy.java | 43 ++++--
.../framework/IgniteEventsProcessProxy.java | 147 +++++++++++++++++++
.../multijvm/framework/IgniteProcessProxy.java | 16 +-
.../IgniteTransactionsProcessProxy.java | 78 ----------
6 files changed, 237 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 75cba54..85eb08e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -105,7 +105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
};
/** Dflt grid. */
- protected Ignite dfltIgnite;
+ protected transient Ignite dfltIgnite;
/** */
private Map<String, CacheConfiguration[]> cacheCfgMap;
@@ -291,15 +291,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
}
for (int i = 0; i < gridCount(); i++) {
- GridCacheContext<String, Integer> ctx = context(i);
+ if (!isMultiJvmAndNodeIsRemote(i)) {
+ GridCacheContext<String, Integer> ctx = context(i);
- int sum = 0;
+ int sum = 0;
- for (String key : map.keySet())
- if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
- sum++;
+ for (String key : map.keySet())
+ if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
+ sum++;
- assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
+ assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
+ }
+ else {
+ // TODO add multi jvm support.
+ }
}
for (int i = 0; i < gridCount(); i++) {
@@ -2949,14 +2954,16 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testPeekTxRemoveOptimistic() throws Exception {
- checkPeekTxRemove(OPTIMISTIC);
+ if (!isMultiJvm()) // Transactions are not supported in multi JVM mode.
+ checkPeekTxRemove(OPTIMISTIC);
}
/**
* @throws Exception If failed.
*/
public void testPeekTxRemovePessimistic() throws Exception {
- checkPeekTxRemove(PESSIMISTIC);
+ if (!isMultiJvm()) // Transactions are not supported in multi JVM mode.
+ checkPeekTxRemove(PESSIMISTIC);
}
/**
@@ -3098,7 +3105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testTtlTx() throws Exception {
- if (txEnabled())
+ if (txEnabled() && !isMultiJvm())
checkTtl(true, false);
}
@@ -3106,6 +3113,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testTtlNoTx() throws Exception {
+ if (isMultiJvm())
+ fail("TODO implement multi jvm support.");
+
checkTtl(false, false);
}
@@ -3113,6 +3123,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
* @throws Exception If failed.
*/
public void testTtlNoTxOldEntry() throws Exception {
+ if (isMultiJvm())
+ fail("TODO implement multi jvm support.");
+
checkTtl(false, true);
}
@@ -3805,26 +3818,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
else {
for (int i = 0; i < gridCount(); i++) {
- GridCacheContext<String, Integer> ctx = context(i);
+ if (!isMultiJvmAndNodeIsRemote(i)) {
+ GridCacheContext<String, Integer> ctx = context(i);
- if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
- continue;
+ if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
+ continue;
- int size = 0;
+ int size = 0;
- for (String key : keys) {
- if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
- GridCacheEntryEx e =
- ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+ for (String key : keys) {
+ if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+ GridCacheEntryEx e =
+ ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
- assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']';
- assert !e.deleted() : "Entry is deleted: " + e;
+ assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']';
+ assert !e.deleted() : "Entry is deleted: " + e;
- size++;
+ size++;
+ }
}
- }
- assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
+ assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
+ }
+ else {
+ // TODO add multi jvm support.
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
index 07a5a5f..7168534 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
@@ -45,10 +45,7 @@ public class AffinityProcessProxy<K> implements Affinity<K> {
public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) {
this.cacheName = cacheName;
gridId = proxy.getId();
-
- ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
- compute = proxy.localJvmGrid().compute(grp);
+ compute = proxy.remoteCompute();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
index 61cbdc3..f8cd8a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
@@ -71,10 +71,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
isAsync = async;
gridId = proxy.getId();
igniteProxy = proxy;
-
- ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
- compute = proxy.localJvmGrid().compute(grp);
+ compute = proxy.remoteCompute();
}
/** {@inheritDoc} */
@@ -89,6 +86,15 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** {@inheritDoc} */
@Override public <R> IgniteFuture<R> future() {
+ // TODO implement.
+// R futureRes = (R)compute.call(new IgniteCallable<Object>() {
+// @Override public Object call() throws Exception {
+// return cache().future().get();
+// }
+// });
+//
+// return new IgniteFinishedFutureImpl<R>(futureRes);
+
throw new UnsupportedOperationException("Method should be supported.");
}
@@ -124,8 +130,14 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
- throw new UnsupportedOperationException("Method should be supported.");
+ @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException {
+ final IgniteBiPredicate pCopy = p;
+
+ compute.run(new IgniteRunnable() {
+ @Override public void run() {
+ cache().localLoadCache(pCopy, args);
+ }
+ });
}
/** {@inheritDoc} */
@@ -178,8 +190,12 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
}
/** {@inheritDoc} */
- @Override public void localEvict(Collection<? extends K> keys) {
- throw new UnsupportedOperationException("Method should be supported.");
+ @Override public void localEvict(final Collection<? extends K> keys) {
+ compute.run(new IgniteRunnable() {
+ @Override public void run() {
+ cache().localEvict(keys);
+ }
+ });
}
/** {@inheritDoc} */
@@ -501,7 +517,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
/** {@inheritDoc} */
@Override public <T> T unwrap(final Class<T> clazz) {
- throw new UnsupportedOperationException("Method cannot be supported because T can be unmarshallable.");
+ try {
+ return (T)compute.call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ return cache().unwrap(clazz);
+ }
+ });
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
new file mode 100644
index 0000000..b9a260b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.multijvm.framework;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ignite events proxy for ignite instance at another JVM.
+ */
+public class IgniteEventsProcessProxy implements IgniteEvents {
+ /** Ignite proxy. */
+ private final transient IgniteProcessProxy igniteProxy;
+
+ /** Grid id. */
+ private final UUID gridId;
+
+ /**
+ * @param igniteProxy Ignite proxy.
+ */
+ public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) {
+ this.igniteProxy = igniteProxy;
+
+ gridId = igniteProxy.getId();
+ }
+
+ /**
+ * @return Events instance.
+ */
+ private IgniteEvents events() {
+ return Ignition.ignite(gridId).events();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterGroup clusterGroup() {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Event> List<T> remoteQuery(IgnitePredicate<T> p, long timeout,
+ @Nullable int... types) throws IgniteException {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr,
+ @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) throws IgniteException {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe,
+ @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+ @Nullable int... types) throws IgniteException {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stopRemoteListen(UUID opId) throws IgniteException {
+ // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter,
+ @Nullable int... types) throws IgniteException {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void recordLocal(Event evt) {
+ // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) {
+ igniteProxy.remoteCompute().run(new IgniteRunnable() {
+ @Override public void run() {
+ events().localListen(lsnr, types);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean stopLocalListen(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) {
+ return false; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void enableLocal(int... types) {
+ // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void disableLocal(int... types) {
+ // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] enabledEvents() {
+ return new int[0]; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEnabled(int type) {
+ return false; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteEvents withAsync() {
+ return null; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync() {
+ return false; // TODO: CODE: implement.
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> future() {
+ return null; // TODO: CODE: implement.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
index 1d11dc3..e534478 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
@@ -277,7 +277,7 @@ public class IgniteProcessProxy implements IgniteEx {
/** {@inheritDoc} */
@Override public IgniteEvents events() {
- return null; // TODO: CODE: implement.
+ return new IgniteEventsProcessProxy(this);
}
/** {@inheritDoc} */
@@ -375,7 +375,7 @@ public class IgniteProcessProxy implements IgniteEx {
/** {@inheritDoc} */
@Override public IgniteTransactions transactions() {
- return new IgniteTransactionsProcessProxy(this);
+ throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode.");
}
/** {@inheritDoc} */
@@ -459,6 +459,18 @@ public class IgniteProcessProxy implements IgniteEx {
return proc;
}
+ /**
+ * @return {@link IgniteCompute} instance to communicate with remote node.
+ */
+ public IgniteCompute remoteCompute() {
+ ClusterGroup grp = localJvmGrid().cluster().forNodeId(id);
+
+ if (grp.nodes().isEmpty())
+ throw new IllegalStateException("Could not found node with id=" + id + ".");
+
+ return locJvmGrid.compute(grp);
+ }
+
// TODO delete or use.
// public <K, V> GridCacheAdapter<K, V> remoteInternalCache() {
// return (GridCacheAdapter<K, V>)compute.call(new MyCallable(id));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
deleted file mode 100644
index 6464838..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.multijvm.framework;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.transactions.*;
-
-import java.util.*;
-
-/**
- * Ignite transactions proxy for ignite instance at another JVM.
- */
-public class IgniteTransactionsProcessProxy implements IgniteTransactions {
- /** Compute. */
- private final transient IgniteCompute compute;
-
- /** Grid id. */
- private final UUID gridId;
-
- /**
- * @param proxy Ignite process proxy.
- */
- public IgniteTransactionsProcessProxy(IgniteProcessProxy proxy) {
- gridId = proxy.getId();
-
- ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
- compute = proxy.localJvmGrid().compute(grp);
- }
-
- /** {@inheritDoc} */
- @Override public Transaction txStart() throws IllegalStateException {
- return null; // TODO: CODE: implement.
- }
-
- /** {@inheritDoc} */
- @Override public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) {
- return null; // TODO: CODE: implement.
- }
-
- /** {@inheritDoc} */
- @Override
- public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout,
- int txSize) {
- return null; // TODO: CODE: implement.
- }
-
- /** {@inheritDoc} */
- @Override public Transaction tx() {
- return null; // TODO: CODE: implement.
- }
-
- /** {@inheritDoc} */
- @Override public TransactionMetrics metrics() {
- return null; // TODO: CODE: implement.
- }
-
- /** {@inheritDoc} */
- @Override public void resetMetrics() {
- // TODO: CODE: implement.
- }
-}