You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/06/05 04:20:16 UTC
[23/29] incubator-ignite git commit: # ignite-981 fixed wait for
cache initialization on clients
# ignite-981 fixed wait for cache initialization on clients
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ddcb2a3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ddcb2a3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ddcb2a3f
Branch: refs/heads/ignite-389-ipc
Commit: ddcb2a3f6932fe8d3f86d3e1c16a3c4a4610959f
Parents: 1603fe5
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 09:25:42 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 11:50:36 2015 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 117 +++++++------
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../IgniteMessagingWithClientTest.java | 164 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
4 files changed, 232 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6e8d457..4382731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1722,68 +1722,83 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- Object msgBody = ioMsg.body();
-
- assert msgBody != null || ioMsg.bodyBytes() != null;
+ busyLock.readLock();
try {
- byte[] msgTopicBytes = ioMsg.topicBytes();
-
- Object msgTopic = ioMsg.topic();
-
- GridDeployment dep = ioMsg.deployment();
-
- if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
- ioMsg.deploymentClassName() != null) {
- dep = ctx.deploy().getGlobalDeployment(
- ioMsg.deploymentMode(),
- ioMsg.deploymentClassName(),
- ioMsg.deploymentClassName(),
- ioMsg.userVersion(),
- nodeId,
- ioMsg.classLoaderId(),
- ioMsg.loaderParticipants(),
- null);
-
- if (dep == null)
- throw new IgniteDeploymentCheckedException(
- "Failed to obtain deployment information for user message. " +
- "If you are using custom message or topic class, try implementing " +
- "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
-
- ioMsg.deployment(dep); // Cache deployment.
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received user message while stopping (will ignore) [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ return;
}
- // Unmarshall message topic if needed.
- if (msgTopic == null && msgTopicBytes != null) {
- msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
+ Object msgBody = ioMsg.body();
- ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
- }
+ assert msgBody != null || ioMsg.bodyBytes() != null;
- if (!F.eq(topic, msgTopic))
- return;
+ try {
+ byte[] msgTopicBytes = ioMsg.topicBytes();
+
+ Object msgTopic = ioMsg.topic();
+
+ GridDeployment dep = ioMsg.deployment();
+
+ if (dep == null && ctx.config().isPeerClassLoadingEnabled() &&
+ ioMsg.deploymentClassName() != null) {
+ dep = ctx.deploy().getGlobalDeployment(
+ ioMsg.deploymentMode(),
+ ioMsg.deploymentClassName(),
+ ioMsg.deploymentClassName(),
+ ioMsg.userVersion(),
+ nodeId,
+ ioMsg.classLoaderId(),
+ ioMsg.loaderParticipants(),
+ null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException(
+ "Failed to obtain deployment information for user message. " +
+ "If you are using custom message or topic class, try implementing " +
+ "GridPeerDeployAware interface. [msg=" + ioMsg + ']');
+
+ ioMsg.deployment(dep); // Cache deployment.
+ }
- if (msgBody == null) {
- msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+ // Unmarshall message topic if needed.
+ if (msgTopic == null && msgTopicBytes != null) {
+ msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null);
- ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
- }
+ ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings.
+ }
- // Resource injection.
- if (dep != null)
- ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
- msg + ']', e);
- }
+ if (!F.eq(topic, msgTopic))
+ return;
+
+ if (msgBody == null) {
+ msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null);
+
+ ioMsg.body(msgBody); // Save body to avoid future unmarshallings.
+ }
- if (msgBody != null) {
- if (predLsnr != null) {
- if (!predLsnr.apply(nodeId, msgBody))
- removeMessageListener(TOPIC_COMM_USER, this);
+ // Resource injection.
+ if (dep != null)
+ ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody);
}
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" +
+ msg + ']', e);
+ }
+
+ if (msgBody != null) {
+ if (predLsnr != null) {
+ if (!predLsnr.apply(nodeId, msgBody))
+ removeMessageListener(TOPIC_COMM_USER, this);
+ }
+ }
+ }
+ finally {
+ busyLock.readUnlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1aef18c..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -274,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
- return demandPool.syncFuture();
+ return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
new file mode 100644
index 0000000..855a4f7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** Message topic. */
+ private enum TOPIC {
+ /** */
+ ORDERED
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new OptimizedMarshaller(false));
+
+ if (gridName.equals(getTestGridName(2))) {
+ cfg.setClientMode(true);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageSendWithClientJoin() throws Exception {
+ startGrid(0);
+
+ Ignite ignite1 = startGrid(1);
+
+ ClusterGroup rmts = ignite1.cluster().forRemotes();
+
+ IgniteMessaging msg = ignite1.message(rmts);
+
+ msg.localListen(TOPIC.ORDERED, new LocalListener());
+
+ msg.remoteListen(TOPIC.ORDERED, new RemoteListener());
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ while (!stop.get()) {
+ if (iter % 10 == 0)
+ log.info("Client start/stop iteration: " + iter);
+
+ iter++;
+
+ try (Ignite ignite = startGrid(2)) {
+ assertTrue(ignite.configuration().isClientMode());
+ }
+ }
+
+ return null;
+ }
+ }, 1, "client-start-stop");
+
+ try {
+ long stopTime = U.currentTimeMillis() + 30_000;
+
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ ignite1.message(rmts).sendOrdered(TOPIC.ORDERED, Integer.toString(iter), 0);
+ }
+ catch (IgniteException e) {
+ log.info("Message send failed: " + e);
+ }
+
+ iter++;
+
+ if (iter % 100 == 0)
+ Thread.sleep(5);
+ }
+ }
+ finally {
+ stop.set(true);
+ }
+
+ fut.get();
+ }
+
+ /**
+ *
+ */
+ private static class LocalListener implements IgniteBiPredicate<UUID, String> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, String s) {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RemoteListener implements IgniteBiPredicate<UUID, String> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID nodeId, String msg) {
+ ignite.message(ignite.cluster().forNodeId(nodeId)).send(TOPIC.ORDERED, msg);
+
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ddcb2a3f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9eb31f1..e0a1e6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -53,6 +53,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridSelfTest.class));
suite.addTest(new TestSuite(GridProjectionSelfTest.class));
suite.addTest(new TestSuite(GridMessagingSelfTest.class));
+ suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class));
suite.addTest(new TestSuite(GridMessagingNoPeerClassLoadingSelfTest.class));
if (U.isLinux() || U.isMacOs())