You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/22 19:02:24 UTC

[6/6] incubator-ignite git commit: # ignite-648: cache proxy; add events proxy; delete transaction support for multiJvm; skip some tests for multi jvm

# ignite-648: cache proxy; add events proxy; delete transaction support for multiJvm; skip some tests for multi jvm


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/470e48b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/470e48b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/470e48b5

Branch: refs/heads/ignite-648
Commit: 470e48b54debab29b7fd1fc39e306876ae379e50
Parents: 2e6fd9b
Author: ashutak <as...@gridgain.com>
Authored: Mon Jun 22 20:02:33 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Mon Jun 22 20:02:33 2015 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAbstractFullApiSelfTest.java |  64 +++++---
 .../framework/AffinityProcessProxy.java         |   5 +-
 .../framework/IgniteCacheProcessProxy.java      |  43 ++++--
 .../framework/IgniteEventsProcessProxy.java     | 147 +++++++++++++++++++
 .../multijvm/framework/IgniteProcessProxy.java  |  16 +-
 .../IgniteTransactionsProcessProxy.java         |  78 ----------
 6 files changed, 237 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 75cba54..85eb08e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -105,7 +105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         };
 
     /** Dflt grid. */
-    protected Ignite dfltIgnite;
+    protected transient Ignite dfltIgnite;
 
     /** */
     private Map<String, CacheConfiguration[]> cacheCfgMap;
@@ -291,15 +291,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         }
 
         for (int i = 0; i < gridCount(); i++) {
-            GridCacheContext<String, Integer> ctx = context(i);
+            if (!isMultiJvmAndNodeIsRemote(i)) {
+                GridCacheContext<String, Integer> ctx = context(i);
 
-            int sum = 0;
+                int sum = 0;
 
-            for (String key : map.keySet())
-                if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
-                    sum++;
+                for (String key : map.keySet())
+                    if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion())))
+                        sum++;
 
-            assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
+                assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL));
+            }
+            else {
+                // TODO add multi jvm support.
+            }
         }
 
         for (int i = 0; i < gridCount(); i++) {
@@ -2949,14 +2954,16 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testPeekTxRemoveOptimistic() throws Exception {
-        checkPeekTxRemove(OPTIMISTIC);
+        if (!isMultiJvm()) // Transactions are not supported in multi JVM mode.
+            checkPeekTxRemove(OPTIMISTIC);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPeekTxRemovePessimistic() throws Exception {
-        checkPeekTxRemove(PESSIMISTIC);
+        if (!isMultiJvm()) // Transactions are not supported in multi JVM mode.
+            checkPeekTxRemove(PESSIMISTIC);
     }
 
     /**
@@ -3098,7 +3105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testTtlTx() throws Exception {
-        if (txEnabled())
+        if (txEnabled() && !isMultiJvm())
             checkTtl(true, false);
     }
 
@@ -3106,6 +3113,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testTtlNoTx() throws Exception {
+        if (isMultiJvm())
+            fail("TODO implement multi jvm support.");
+
         checkTtl(false, false);
     }
 
@@ -3113,6 +3123,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testTtlNoTxOldEntry() throws Exception {
+        if (isMultiJvm())
+            fail("TODO implement multi jvm support.");
+
         checkTtl(false, true);
     }
 
@@ -3805,26 +3818,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
         else {
             for (int i = 0; i < gridCount(); i++) {
-                GridCacheContext<String, Integer> ctx = context(i);
+                if (!isMultiJvmAndNodeIsRemote(i)) {
+                    GridCacheContext<String, Integer> ctx = context(i);
 
-                if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
-                    continue;
+                    if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED)
+                        continue;
 
-                int size = 0;
+                    int size = 0;
 
-                for (String key : keys) {
-                    if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
-                        GridCacheEntryEx e =
-                            ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
+                    for (String key : keys) {
+                        if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) {
+                            GridCacheEntryEx e =
+                                ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);
 
-                        assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']';
-                        assert !e.deleted() : "Entry is deleted: " + e;
+                            assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']';
+                            assert !e.deleted() : "Entry is deleted: " + e;
 
-                        size++;
+                            size++;
+                        }
                     }
-                }
 
-                assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
+                    assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL));
+                }
+                else {
+                    // TODO add multi jvm support.
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
index 07a5a5f..7168534 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java
@@ -45,10 +45,7 @@ public class AffinityProcessProxy<K> implements Affinity<K> {
     public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) {
         this.cacheName = cacheName;
         gridId = proxy.getId();
-
-        ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
-        compute = proxy.localJvmGrid().compute(grp);
+        compute = proxy.remoteCompute();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
index 61cbdc3..f8cd8a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java
@@ -71,10 +71,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         isAsync = async;
         gridId = proxy.getId();
         igniteProxy = proxy;
-
-        ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
-        compute = proxy.localJvmGrid().compute(grp);
+        compute = proxy.remoteCompute();
     }
 
     /** {@inheritDoc} */
@@ -89,6 +86,15 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> future() {
+        // TODO implement.
+//        R futureRes = (R)compute.call(new IgniteCallable<Object>() {
+//            @Override public Object call() throws Exception {
+//                return cache().future().get();
+//            }
+//        });
+//
+//        return new IgniteFinishedFutureImpl<R>(futureRes);
+
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
@@ -124,8 +130,14 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException {
-        throw new UnsupportedOperationException("Method should be supported.");
+    @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException {
+        final IgniteBiPredicate pCopy = p;
+
+        compute.run(new IgniteRunnable() {
+            @Override public void run() {
+                cache().localLoadCache(pCopy, args);
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -178,8 +190,12 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void localEvict(Collection<? extends K> keys) {
-        throw new UnsupportedOperationException("Method should be supported.");
+    @Override public void localEvict(final Collection<? extends K> keys) {
+        compute.run(new IgniteRunnable() {
+            @Override public void run() {
+                cache().localEvict(keys);
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -501,7 +517,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /** {@inheritDoc} */
     @Override public <T> T unwrap(final Class<T> clazz) {
-        throw new UnsupportedOperationException("Method cannot be supported because T can be unmarshallable.");
+        try {
+            return (T)compute.call(new IgniteCallable<Object>() {
+                @Override public Object call() throws Exception {
+                    return cache().unwrap(clazz);
+                }
+            });
+        }
+        catch (Exception e) {
+            throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
new file mode 100644
index 0000000..b9a260b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.multijvm.framework;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ignite events proxy for ignite instance at another JVM.
+ */
+public class IgniteEventsProcessProxy implements IgniteEvents {
+    /** Ignite proxy. */
+    private final transient IgniteProcessProxy igniteProxy;
+
+    /** Grid id. */
+    private final UUID gridId;
+
+    /**
+     * @param igniteProxy Ignite proxy.
+     */
+    public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) {
+        this.igniteProxy = igniteProxy;
+
+        gridId = igniteProxy.getId();
+    }
+
+    /**
+     * @return Events instance.
+     */
+    private IgniteEvents events() {
+        return Ignition.ignite(gridId).events();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup clusterGroup() {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends Event> List<T> remoteQuery(IgnitePredicate<T> p, long timeout,
+        @Nullable int... types) throws IgniteException {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr,
+        @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) throws IgniteException {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe,
+        @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter,
+        @Nullable int... types) throws IgniteException {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopRemoteListen(UUID opId) throws IgniteException {
+        // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter,
+        @Nullable int... types) throws IgniteException {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void recordLocal(Event evt) {
+        // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) {
+        igniteProxy.remoteCompute().run(new IgniteRunnable() {
+            @Override public void run() {
+                events().localListen(lsnr, types);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopLocalListen(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) {
+        return false; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void enableLocal(int... types) {
+        // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disableLocal(int... types) {
+        // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] enabledEvents() {
+        return new int[0]; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEnabled(int type) {
+        return false; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteEvents withAsync() {
+        return null; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        return false; // TODO: CODE: implement.
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        return null; // TODO: CODE: implement.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
index 1d11dc3..e534478 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java
@@ -277,7 +277,7 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public IgniteEvents events() {
-        return null; // TODO: CODE: implement.
+        return new IgniteEventsProcessProxy(this);
     }
 
     /** {@inheritDoc} */
@@ -375,7 +375,7 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public IgniteTransactions transactions() {
-        return new IgniteTransactionsProcessProxy(this);
+        throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode.");
     }
 
     /** {@inheritDoc} */
@@ -459,6 +459,18 @@ public class IgniteProcessProxy implements IgniteEx {
         return proc;
     }
 
+    /**
+     * @return {@link IgniteCompute} instance to communicate with remote node.
+     */
+    public IgniteCompute remoteCompute() {
+        ClusterGroup grp = localJvmGrid().cluster().forNodeId(id);
+
+        if (grp.nodes().isEmpty())
+            throw new IllegalStateException("Could not found node with id=" + id + ".");
+
+        return locJvmGrid.compute(grp);
+    }
+
     // TODO delete or use.
 //    public <K, V> GridCacheAdapter<K, V> remoteInternalCache() {
 //        return (GridCacheAdapter<K, V>)compute.call(new MyCallable(id));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
deleted file mode 100644
index 6464838..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.multijvm.framework;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.transactions.*;
-
-import java.util.*;
-
-/**
- * Ignite transactions proxy for ignite instance at another JVM.
- */
-public class IgniteTransactionsProcessProxy implements IgniteTransactions {
-    /** Compute. */
-    private final transient IgniteCompute compute;
-
-    /** Grid id. */
-    private final UUID gridId;
-
-    /**
-     * @param proxy Ignite process proxy.
-     */
-    public IgniteTransactionsProcessProxy(IgniteProcessProxy proxy) {
-        gridId = proxy.getId();
-
-        ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
-
-        compute = proxy.localJvmGrid().compute(grp);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Transaction txStart() throws IllegalStateException {
-        return null; // TODO: CODE: implement.
-    }
-
-    /** {@inheritDoc} */
-    @Override public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) {
-        return null; // TODO: CODE: implement.
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout,
-        int txSize) {
-        return null; // TODO: CODE: implement.
-    }
-
-    /** {@inheritDoc} */
-    @Override public Transaction tx() {
-        return null; // TODO: CODE: implement.
-    }
-
-    /** {@inheritDoc} */
-    @Override public TransactionMetrics metrics() {
-        return null; // TODO: CODE: implement.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resetMetrics() {
-        // TODO: CODE: implement.
-    }
-}