You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/01 16:50:12 UTC

[38/40] incubator-ignite git commit: # ignite-648: loadAll fix and IgniteProcessProxy.java refactoring (constructor, close)

# ignite-648: loadAll fix and IgniteProcessProxy.java refactoring (constructor, close)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ec660e53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ec660e53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ec660e53

Branch: refs/heads/ignite-648-failover
Commit: ec660e537e55b8f983fbe11b00cc6590125094a5
Parents: ae4e791
Author: ashutak <as...@gridgain.com>
Authored: Wed Jul 1 17:44:15 2015 +0300
Committer: ashutak <as...@gridgain.com>
Committed: Wed Jul 1 17:44:15 2015 +0300

----------------------------------------------------------------------
 ...heAtomicNearOnlyMultiJvmFullApiSelfTest.java |  2 +-
 .../junits/common/GridCommonAbstractTest.java   | 35 +++++++-
 .../multijvm/IgniteCacheProcessProxy.java       |  4 +-
 .../junits/multijvm/IgniteProcessProxy.java     | 88 ++++++++++++++++----
 4 files changed, 109 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java
index 4a2dcdd..ba01611 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.*;
 /**
  * Multy Jvm tests.
  */
-public class GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest extends 
+public class GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest extends
     GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest {
     /** {@inheritDoc} */
     protected boolean isMultiJvm() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 1fc4415..01cb240 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -254,7 +254,40 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @param replaceExistingValues Replace existing values.
      * @throws Exception If failed.
      */
-    protected static <K> void loadAll(Cache<K, ?> cache, Set<K> keys, boolean replaceExistingValues) throws Exception {
+    protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) throws Exception {
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        if (!(ignite instanceof IgniteProcessProxy))
+            loadAll0(cache, keys, replaceExistingValues);
+        else {
+            IgniteProcessProxy proxy = (IgniteProcessProxy)ignite;
+
+            final UUID id = proxy.getId();
+
+            final String cacheName = cache.getName();
+
+            final Set<Object> keysCp = (Set<Object>)keys;
+
+            proxy.remoteCompute().run(new CAX() {
+                @Override public void applyx() throws IgniteCheckedException {
+                    try {
+                        loadAll0(Ignition.ignite(id).cache(cacheName), keysCp, replaceExistingValues);
+                    }
+                    catch (Exception e) {
+                        throw new IgniteCheckedException(e);
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Keys.
+     * @param replaceExistingValues Replace existing values.
+     * @throws Exception If failed.
+     */
+    private static <K> void loadAll0(Cache<K, ?> cache, Set<K> keys, boolean replaceExistingValues) throws Exception {
         final AtomicReference<Exception> ex = new AtomicReference<>();
 
         final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index a478db0..e23739d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -280,8 +280,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override  public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) {
-        throw new UnsupportedOperationException("Method should be supported.");
+    @Override  public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) {
+        throw new UnsupportedOperationException("Oparetion can't be supported automatically.");
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index b96fe5f..6da478d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.*;
@@ -59,9 +60,6 @@ public class IgniteProcessProxy implements IgniteEx {
     /** Grid id. */
     private final UUID id = UUID.randomUUID();
 
-    /** Remote ignite instance started latch. */
-    private final transient CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1);
-
     /**
      * @param cfg Configuration.
      * @param log Logger.
@@ -84,17 +82,9 @@ public class IgniteProcessProxy implements IgniteEx {
                 filteredJvmArgs.add(arg);
         }
 
-        locJvmGrid.events().localListen(new IgnitePredicateX<Event>() {
-            @Override public boolean applyx(Event e) {
-                if (((DiscoveryEvent)e).eventNode().id().equals(id)) {
-                    rmtNodeStartedLatch.countDown();
+        final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1);
 
-                    return false;
-                }
-
-                return true;
-            }
-        }, EventType.EVT_NODE_JOINED);
+        locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED);
 
         proc = GridJavaProcess.exec(
             IgniteNodeRunner.class,
@@ -117,6 +107,36 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /**
+     */
+    private static class NodeStartedListener extends IgnitePredicateX<Event> {
+        /** Id. */
+        private final UUID id;
+
+        /** Remote node started latch. */
+        private final CountDownLatch rmtNodeStartedLatch;
+
+        /**
+         * @param id Id.
+         * @param rmtNodeStartedLatch Remotenode started latch.
+         */
+        NodeStartedListener(UUID id, CountDownLatch rmtNodeStartedLatch) {
+            this.id = id;
+            this.rmtNodeStartedLatch = rmtNodeStartedLatch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean applyx(Event e) {
+            if (((DiscoveryEvent)e).eventNode().id().equals(id)) {
+                rmtNodeStartedLatch.countDown();
+
+                return false;
+            }
+
+            return true;
+        }
+    }
+
+    /**
      * @param gridName Grid name.
      * @return Instance by name or exception wiil be thrown.
      */
@@ -131,6 +151,15 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /**
+     * For usage in closures.
+     *
+     * @return Ignite instance.
+     */
+    private Ignite igniteById() {
+        return Ignition.ignite(id);
+    }
+
+    /**
      * @param locNodeId ID of local node the requested grid instance is managing.
      * @return An instance of named grid. This method never returns {@code null}.
      * @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or
@@ -378,7 +407,7 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public void destroyCache(String cacheName) {
-        // TODO: CODE: implement.
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
 
     /** {@inheritDoc} */
@@ -388,7 +417,7 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public IgniteTransactions transactions() {
-        throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode.");
+        throw new UnsupportedOperationException("Transactions can't be supported automatically in multi JVM mode.");
     }
 
     /** {@inheritDoc} */
@@ -452,11 +481,38 @@ public class IgniteProcessProxy implements IgniteEx {
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteException {
+        final CountDownLatch rmtNodeStoppedLatch = new CountDownLatch(1);
+
+        locJvmGrid.events().localListen(new IgnitePredicateX<Event>() {
+            @Override public boolean applyx(Event e) {
+                if (((DiscoveryEvent)e).eventNode().id().equals(id)) {
+                    rmtNodeStoppedLatch.countDown();
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED);
+
+        compute().run(new IgniteRunnable() {
+            @Override public void run() {
+                igniteById().close();
+            }
+        });
+
+        try {
+            assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id;
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new IgniteException(e);
+        }
+
         try {
             getProcess().kill();
         }
         catch (Exception e) {
-            e.printStackTrace();
+            X.printerr("Could not kill process after close.", e);
         }
     }