You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:50 UTC
[02/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java
new file mode 100644
index 0000000..609885d
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.wancommand;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.management.cli.Result;
+import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings;
+import com.gemstone.gemfire.management.internal.cli.result.CommandResult;
+import com.gemstone.gemfire.management.internal.cli.result.TabularResultData;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Properties;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter;
+import static com.gemstone.gemfire.test.dunit.Wait.pause;
+
+@Category(DistributedTest.class)
+public class WanCommandPauseResumeDUnitTest extends WANCommandTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void testPauseGatewaySender_ErrorConditions() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+
+ String command = CliStrings.PAUSE_GATEWAYSENDER + " --"
+ + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.PAUSE_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId()
+ + " --" + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.ERROR, cmdResult.getStatus());
+ assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+ }
+
+ /**
+ * test to validate that the start gateway sender starts the gateway sender on
+ * a member
+ */
+ @Test
+ public void testPauseGatewaySender_onMember() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+ pause(10000);
+ String command = CliStrings.PAUSE_GATEWAYSENDER + " --"
+ + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.PAUSE_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId();
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ assertTrue(strCmdResult.contains("is paused on member"));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ }
+
+ @Test
+ public void testPauseGatewaySender() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCache( punePort ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCache( punePort ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ pause(10000);
+ String command = CliStrings.PAUSE_GATEWAYSENDER + " --"
+ + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(5, status.size());
+ assertTrue(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ }
+
+ /**
+ * test to validate that the start gateway sender starts the gateway sender on
+ * a group of members
+ */
+ @Test
+ public void testPauseGatewaySender_Group() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ pause(10000);
+ String command = CliStrings.PAUSE_GATEWAYSENDER + " --"
+ + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testPauseGatewaySender_Group stringResult : " + strCmdResult
+ + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(3, status.size());
+ assertFalse(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ }
+
+ /**
+ * Test to validate the scenario gateway sender is started when one or more
+ * sender members belongs to multiple groups
+ */
+ @Test
+ public void testPauseGatewaySender_MultipleGroup() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1, SenderGroup2" ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm6.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup2" ));
+ vm6.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm7.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup3" ));
+ vm7.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+ vm6.invoke(() -> startSender( "ln" ));
+ vm7.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm6.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm7.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ pause(10000);
+ String command = CliStrings.PAUSE_GATEWAYSENDER + " --"
+ + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1,SenderGroup2";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testPauseGatewaySender_Group stringResult : " + strCmdResult
+ + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(4, status.size());
+ assertFalse(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm6.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm7.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ }
+
+ @Test
+ public void testResumeGatewaySender_ErrorConditions() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+
+ String command = CliStrings.RESUME_GATEWAYSENDER + " --"
+ + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.RESUME_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId()
+ + " --" + CliStrings.RESUME_GATEWAYSENDER__GROUP + "=SenderGroup1";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testResumeGatewaySender_ErrorConditions stringResult : "
+ + strCmdResult + ">>>>");
+ assertEquals(Result.Status.ERROR, cmdResult.getStatus());
+ assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE));
+ } else {
+ fail("testPauseGatewaySender failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testResumeGatewaySender() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCache( punePort ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCache( punePort ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ vm3.invoke(() -> pauseSender( "ln" ));
+ vm4.invoke(() -> pauseSender( "ln" ));
+ vm5.invoke(() -> pauseSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+
+ pause(10000);
+ String command = CliStrings.RESUME_GATEWAYSENDER + " --"
+ + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(5, status.size());
+ assertTrue(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testResumeGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ }
+
+ /**
+ * test to validate that the start gateway sender starts the gateway sender on
+ * a member
+ */
+ @Test
+ public void testResumeGatewaySender_onMember() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCache( punePort ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ vm3.invoke(() -> pauseSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+ pause(10000);
+ String command = CliStrings.RESUME_GATEWAYSENDER + " --"
+ + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.RESUME_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId();
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ assertTrue(strCmdResult.contains("is resumed on member"));
+ } else {
+ fail("testResumeGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ }
+
+ /**
+ * test to validate that the start gateway sender starts the gateway sender on
+ * a group of members
+ */
+ @Test
+ public void testResumeGatewaySender_Group() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ vm3.invoke(() -> pauseSender( "ln" ));
+ vm4.invoke(() -> pauseSender( "ln" ));
+ vm5.invoke(() -> pauseSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+
+ pause(10000);
+ String command = CliStrings.RESUME_GATEWAYSENDER + " --"
+ + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.RESUME_GATEWAYSENDER__GROUP + "=SenderGroup1";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(3, status.size());
+ assertFalse(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testResumeGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ }
+
+ /**
+ * Test to validate the scenario gateway sender is started when one or more
+ * sender members belongs to multiple groups
+ */
+ @Test
+ public void testResumeGatewaySender_MultipleGroup() {
+
+ Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, "1");
+ props.setProperty(LOCATORS, "localhost[" + punePort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort ));
+
+ vm3.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm3.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm4.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1" ));
+ vm4.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm5.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup1, SenderGroup2" ));
+ vm5.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm6.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup2" ));
+ vm6.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+ vm7.invoke(() -> createCacheWithGroups(
+ punePort, "SenderGroup3" ));
+ vm7.invoke(() -> createSender( "ln",
+ 2, false, 100, 400, false, false, null, true ));
+
+ vm3.invoke(() -> startSender( "ln" ));
+ vm4.invoke(() -> startSender( "ln" ));
+ vm5.invoke(() -> startSender( "ln" ));
+ vm6.invoke(() -> startSender( "ln" ));
+ vm7.invoke(() -> startSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm6.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm7.invoke(() -> verifySenderState(
+ "ln", true, false ));
+
+ vm3.invoke(() -> pauseSender( "ln" ));
+ vm4.invoke(() -> pauseSender( "ln" ));
+ vm5.invoke(() -> pauseSender( "ln" ));
+ vm6.invoke(() -> pauseSender( "ln" ));
+ vm7.invoke(() -> pauseSender( "ln" ));
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm6.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ vm7.invoke(() -> verifySenderState(
+ "ln", true, true ));
+
+ pause(10000);
+ String command = CliStrings.RESUME_GATEWAYSENDER + " --"
+ + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --"
+ + CliStrings.RESUME_GATEWAYSENDER__GROUP
+ + "=SenderGroup1,SenderGroup2";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ TabularResultData resultData = (TabularResultData) cmdResult
+ .getResultData();
+ List<String> status = resultData.retrieveAllValues("Result");
+ assertEquals(4, status.size());
+ assertFalse(status.contains("Error"));
+ assertTrue(status.contains("OK"));
+ } else {
+ fail("testResumeGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm4.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm5.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm6.invoke(() -> verifySenderState(
+ "ln", true, false ));
+ vm7.invoke(() -> verifySenderState(
+ "ln", true, true ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java
new file mode 100644
index 0000000..9271614
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.wancommand;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.management.cli.Result;
+import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings;
+import com.gemstone.gemfire.management.internal.cli.result.CommandResult;
+import com.gemstone.gemfire.management.internal.cli.result.CompositeResultData;
+import com.gemstone.gemfire.management.internal.cli.result.TabularResultData;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Properties;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.LOCATORS;
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter;
+import static com.gemstone.gemfire.test.dunit.Wait.pause;
+
+@Category(DistributedTest.class)
+public class WanCommandStatusDUnitTest extends WANCommandTestBase{
+
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void testGatewaySenderStatus(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm6.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createCache( lnPort ));
+ vm3.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true ));
+ vm3.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm4.invoke(() -> createCache( lnPort ));
+ vm4.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true));
+ vm4.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm5.invoke(() -> createCache( lnPort ));
+ vm5.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true));
+ vm5.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial";
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(2, result_hosts.size());
+
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewaySenderStatus : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> startSender( "ln_Serial" ));
+ vm3.invoke(() -> startSender( "ln_Parallel" ));
+
+ vm4.invoke(() -> startSender( "ln_Serial" ));
+ vm4.invoke(() -> startSender( "ln_Parallel" ));
+
+ vm5.invoke(() -> startSender( "ln_Serial" ));
+ vm5.invoke(() -> startSender( "ln_Parallel" ));
+
+ pause(10000);
+ command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial";
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(2, result_hosts.size());
+
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewaySenderStatus : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testGatewaySenderStatus_OnMember(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm6.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createCache( lnPort ));
+ vm3.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true ));
+ vm3.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm4.invoke(() -> createCache( lnPort ));
+ vm4.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true));
+ vm4.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm5.invoke(() -> createCache( lnPort ));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --"
+ + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId();
+
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> ");
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(1, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> startSender( "ln_Serial" ));
+ vm3.invoke(() -> startSender( "ln_Parallel" ));
+
+ vm4.invoke(() -> startSender( "ln_Serial" ));
+ vm4.invoke(() -> startSender( "ln_Parallel" ));
+
+ pause(10000);
+ command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --"
+ + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId();
+
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+// TabularResultData tableResultData =
+// (TabularResultData) cmdResult.getResultData();
+// List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+// assertIndexDetailsEquals(1, result_Status.size());
+// assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> ");
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(1, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+
+ final DistributedMember vm5Member = (DistributedMember) vm5.invoke(() -> getMember());
+
+ command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --"
+ + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm5Member.getId();
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+// ErrorResultData errorResultData =
+// (ErrorResultData) cmdResult.getResultData();
+ assertTrue(cmdResult != null);
+
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testGatewaySenderStatus_OnGroups(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm7.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createCacheWithGroups( lnPort, "Serial_Sender, Parallel_Sender"));
+ vm3.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true ));
+ vm3.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm4.invoke(() -> createCacheWithGroups( lnPort,"Serial_Sender, Parallel_Sender"));
+ vm4.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true));
+ vm4.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm5.invoke(() -> createCacheWithGroups( lnPort,"Parallel_Sender"));
+ vm5.invoke(() -> createSender(
+ "ln_Serial", 2, false, 100, 400, false, false, null, true));
+ vm5.invoke(() -> createSender(
+ "ln_Parallel", 2, true, 100, 400, false, false, null, true));
+
+ vm6.invoke(() -> createCacheWithGroups( lnPort,"Serial_Sender"));
+
+ final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember());
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + CliStrings.STATUS_GATEWAYSENDER__GROUP + "=Serial_Sender";
+
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(2, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(1, result_hosts.size());
+
+
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewaySenderStatus_OnGroups : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> startSender( "ln_Serial" ));
+ vm3.invoke(() -> startSender( "ln_Parallel" ));
+
+ vm4.invoke(() -> startSender( "ln_Serial" ));
+ vm4.invoke(() -> startSender( "ln_Parallel" ));
+
+ pause(10000);
+ command = CliStrings.STATUS_GATEWAYSENDER + " --"
+ + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + CliStrings.STATUS_GATEWAYSENDER__GROUP + "=Serial_Sender";
+
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(2, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(1, result_hosts.size());
+
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewaySenderStatus_OnGroups : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ } else {
+ fail("testListGatewaySender failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testGatewayReceiverStatus(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm6.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createAndStartReceiver( lnPort ));
+ vm4.invoke(() -> createAndStartReceiver( lnPort ));
+ vm5.invoke(() -> createAndStartReceiver( lnPort ));
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYRECEIVER;
+ CommandResult cmdResult = executeCommand(command);
+
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewayReceiverStatus : " + strCmdResult + ">>>>> ");
+
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(2, result_hosts.size());
+
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> stopReceiver());
+ vm4.invoke(() -> stopReceiver());
+ vm5.invoke(() -> stopReceiver());
+
+ pause(10000);
+
+ command = CliStrings.STATUS_GATEWAYRECEIVER;
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewayReceiverStatus : " + strCmdResult + ">>>>> ");
+
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+ tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER);
+ assertEquals(2, result_hosts.size());
+
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testGatewayReceiverStatus_OnMember(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm6.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createAndStartReceiver( lnPort ));
+ vm4.invoke(() -> createAndStartReceiver( lnPort ));
+ vm5.invoke(() -> createAndStartReceiver( lnPort ));
+
+ final DistributedMember vm3Member = (DistributedMember) vm3.invoke(() -> getMember());
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYRECEIVER+ " --"
+ + CliStrings.STATUS_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId();
+
+ CommandResult cmdResult = executeCommand(command);
+
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info("testGatewayReceiverStatus : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+ //TabularResultData tableResultData = (TabularResultData) cmdResult.getResultData();
+ //List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ //assertIndexDetailsEquals(1, result_Status.size());
+ //assertFalse(strCmdResult.contains(CliStrings.GATEWAY_NOT_RUNNING));
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(1, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> stopReceiver());
+ vm4.invoke(() -> stopReceiver());
+ vm5.invoke(() -> stopReceiver());
+
+ pause(10000);
+
+ command = CliStrings.STATUS_GATEWAYRECEIVER+ " --"
+ + CliStrings.STATUS_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId();
+
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewayReceiverStatus : " + strCmdResult + ">>>>> ");
+
+// TabularResultData tableResultData =
+// (TabularResultData) cmdResult.getResultData();
+// List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+// assertIndexDetailsEquals(1, result_Status.size());
+// assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(1, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+ }
+
+ @Test
+ public void testGatewayReceiverStatus_OnGroups(){
+
+ Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 ));
+
+ Properties props = getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + lnPort + "]");
+ setUpJmxManagerOnVm0ThenConnect(props);
+
+ Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort ));
+
+ vm7.invoke(() -> createAndStartReceiver( nyPort ));
+
+ vm3.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1, RG2" ));
+ vm4.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1, RG2" ));
+ vm5.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1" ));
+ vm6.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG2" ));
+
+ pause(10000);
+ String command = CliStrings.STATUS_GATEWAYRECEIVER + " --"
+ + CliStrings.STATUS_GATEWAYRECEIVER__GROUP + "=RG1";
+
+ CommandResult cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewayReceiverStatus : " + strCmdResult + ">>>>> ");
+
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING));
+
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+
+ vm3.invoke(() -> stopReceiver());
+ vm4.invoke(() -> stopReceiver());
+ vm5.invoke(() -> stopReceiver());
+
+ pause(10000);
+ command = CliStrings.STATUS_GATEWAYRECEIVER + " --"+ CliStrings.STATUS_GATEWAYRECEIVER__GROUP + "=RG1";
+
+ cmdResult = executeCommand(command);
+ if (cmdResult != null) {
+ String strCmdResult = commandResultToString(cmdResult);
+ getLogWriter().info(
+ "testGatewayReceiverStatus_OnGroups : " + strCmdResult + ">>>>> ");
+ assertEquals(Result.Status.OK, cmdResult.getStatus());
+
+ TabularResultData tableResultData =
+ ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+
+ List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS);
+ assertEquals(3, result_Status.size());
+ assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING));
+
+ } else {
+ fail("testGatewayReceiverStatus failed as did not get CommandResult");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
new file mode 100644
index 0000000..c21cf26
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.management;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.Map;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.management.internal.MBeanJMXAdapter;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * Tests for WAN artifacts like Sender and Receiver. The purpose of this test is
+ * not to check WAN functionality , but to verify ManagementServices running
+ * properly and reflecting WAN behaviour and data properly
+ *
+ *
+ */
+@Category(DistributedTest.class)
+public class WANManagementDUnitTest extends ManagementTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+
+ public static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer;
+
+ public WANManagementDUnitTest() throws Exception {
+ super();
+ }
+
+ @Test
+ public void testMBeanCallback() throws Exception {
+
+ VM nyLocator = getManagedNodeList().get(0);
+ VM nyReceiver = getManagedNodeList().get(1);
+ VM puneSender = getManagedNodeList().get(2);
+ VM managing = getManagingNode();
+ VM puneLocator = Host.getLocator();
+
+ int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort());
+
+ Integer nyPort = (Integer)nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort ));
+
+
+
+ puneSender.invoke(() -> WANTestBase.createCache( punePort ));
+ managing.invoke(() -> WANTestBase.createManagementCache( punePort ));
+ startManagingNode(managing);
+
+ // keep a larger batch to minimize number of exception occurrences in the
+ // log
+ puneSender.invoke(() -> WANTestBase.createSender( "pn",
+ 12, true, 100, 300, false, false, null, true ));
+ managing.invoke(() -> WANTestBase.createSender( "pn",
+ 12, true, 100, 300, false, false, null, true ));
+
+
+ puneSender.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false ));
+ managing.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false ));
+
+ nyReceiver.invoke(() -> WANTestBase.createCache( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
+
+ WANTestBase.startSenderInVMs("pn", puneSender, managing);
+
+ // make sure all the senders are running before doing any puts
+ puneSender.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" ));
+ managing.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" ));
+
+
+
+
+ checkSenderMBean(puneSender, getTestMethodName() + "_PR");
+ checkSenderMBean(managing, getTestMethodName() + "_PR");
+
+ checkReceiverMBean(nyReceiver);
+
+ stopGatewaySender(puneSender);
+ startGatewaySender(puneSender);
+
+ DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember());
+
+ checkProxySender(managing, puneMember);
+ checkSenderNavigationAPIS(managing, puneMember);
+
+ }
+
+ @Test
+ public void testReceiverMBean() throws Exception {
+
+ VM nyLocator = getManagedNodeList().get(0);
+ VM nyReceiver = getManagedNodeList().get(1);
+ VM puneSender = getManagedNodeList().get(2);
+ VM managing = getManagingNode();
+ VM puneLocator = Host.getLocator();
+
+ int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort());
+
+ Integer nyPort = (Integer) nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort ));
+
+ puneSender.invoke(() -> WANTestBase.createCache( punePort ));
+
+ nyReceiver.invoke(() -> WANTestBase.createCache( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
+
+ // keep a larger batch to minimize number of exception occurrences in the
+ // log
+ puneSender.invoke(() -> WANTestBase.createSender( "pn",
+ 12, true, 100, 300, false, false, null, true ));
+
+ puneSender.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false ));
+
+ puneSender.invoke(() -> WANTestBase.startSender( "pn" ));
+
+ // make sure all the senders are running before doing any puts
+ puneSender.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" ));
+
+ managing.invoke(() -> WANTestBase.createManagementCache( nyPort ));
+ startManagingNode(managing);
+
+
+ checkSenderMBean(puneSender, getTestMethodName() + "_PR");
+ checkReceiverMBean(nyReceiver);
+
+ DistributedMember nyMember = (DistributedMember) nyReceiver.invoke(() -> WANManagementDUnitTest.getMember());
+
+ checkProxyReceiver(managing, nyMember);
+ checkReceiverNavigationAPIS(managing, nyMember);
+
+
+ }
+
+
+ @Test
+ public void testAsyncEventQueue() throws Exception {
+
+ VM nyLocator = getManagedNodeList().get(0);
+ VM nyReceiver = getManagedNodeList().get(1);
+ VM puneSender = getManagedNodeList().get(2);
+ VM managing = getManagingNode();
+ VM puneLocator = Host.getLocator();
+
+ int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort());
+
+ Integer nyPort = (Integer)nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort ));
+
+
+
+ puneSender.invoke(() -> WANTestBase.createCache( punePort ));
+ managing.invoke(() -> WANTestBase.createManagementCache( punePort ));
+ startManagingNode(managing);
+
+
+ puneSender.invoke(() -> WANTestBase.createAsyncEventQueue(
+ "pn", false, 100, 100, false, false, "puneSender", false ));
+ managing.invoke(() -> WANTestBase.createAsyncEventQueue(
+ "pn", false, 100, 100, false, false, "managing", false ));
+
+
+ puneSender.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue(
+ getTestMethodName() + "_RR", "pn", false ));
+ managing.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue(
+ getTestMethodName() + "_RR", "pn", false ));
+
+ WANTestBase.createCacheInVMs(nyPort, nyReceiver);
+ nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
+
+ checkAsyncQueueMBean(puneSender);
+ checkAsyncQueueMBean(managing);
+
+
+ DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember());
+
+ checkProxyAsyncQueue(managing, puneMember);
+
+ }
+
+
+ @SuppressWarnings("serial")
+ protected void checkSenderNavigationAPIS(final VM vm,
+ final DistributedMember senderMember) {
+ SerializableRunnable checkNavigationAPIS = new SerializableRunnable(
+ "Check Sender Navigation APIs") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+ ObjectName expectedName = service.getGatewaySenderMBeanName(
+ senderMember, "pn");
+ try {
+ ObjectName actualName = bean.fetchGatewaySenderObjectName(
+ senderMember.getId(), "pn");
+ assertEquals(expectedName, actualName);
+ } catch (Exception e) {
+ fail("Sender Navigation Failed " + e);
+ }
+
+ assertEquals(2, bean.listGatewaySenderObjectNames().length);
+ try {
+ assertEquals(1, bean.listGatewaySenderObjectNames(senderMember
+ .getId()).length);
+ } catch (Exception e) {
+ fail("Sender Navigation Failed " + e);
+ }
+
+ }
+ };
+ vm.invoke(checkNavigationAPIS);
+ }
+
+ @SuppressWarnings("serial")
+ protected void checkReceiverNavigationAPIS(final VM vm,
+ final DistributedMember receiverMember) {
+ SerializableRunnable checkNavigationAPIS = new SerializableRunnable(
+ "Check Receiver Navigation APIs") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ DistributedSystemMXBean bean = service.getDistributedSystemMXBean();
+ ObjectName expectedName = service
+ .getGatewayReceiverMBeanName(receiverMember);
+ try {
+ ObjectName actualName = bean
+ .fetchGatewayReceiverObjectName(receiverMember.getId());
+ assertEquals(expectedName, actualName);
+ } catch (Exception e) {
+ fail("Receiver Navigation Failed " + e);
+ }
+
+ assertEquals(1, bean.listGatewayReceiverObjectNames().length);
+
+ }
+ };
+ vm.invoke(checkNavigationAPIS);
+ }
+
+ private static int getLocatorPort(){
+ return Locator.getLocators().get(0).getPort();
+ }
+ private static DistributedMember getMember(){
+ return GemFireCacheImpl.getInstance().getMyId();
+ }
+
+ /**
+ * Checks Proxy GatewaySender
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkProxySender(final VM vm, final DistributedMember senderMember) {
+ SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Sender") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ GatewaySenderMXBean bean = null;
+ try {
+ bean = MBeanUtil.getGatewaySenderMbeanProxy(senderMember, "pn");
+ } catch (Exception e) {
+ fail("Could not obtain Sender Proxy in desired time " + e);
+ }
+ assertNotNull(bean);
+ final ObjectName senderMBeanName = service.getGatewaySenderMBeanName(
+ senderMember, "pn");
+ try {
+ MBeanUtil.printBeanDetails(senderMBeanName);
+ } catch (Exception e) {
+ fail("Error while Printing Bean Details " + e);
+ }
+
+ if(service.isManager()){
+ DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean();
+ Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus();
+
+ LogWriterUtils.getLogWriter().info(
+ "<ExpectedString> Ds Map is: " + dsMap
+ + "</ExpectedString> ");
+
+ }
+
+ }
+ };
+ vm.invoke(checkProxySender);
+ }
+
+ /**
+ * Checks Proxy GatewayReceiver
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkProxyReceiver(final VM vm,
+ final DistributedMember senderMember) {
+ SerializableRunnable checkProxySender = new SerializableRunnable(
+ "Check Proxy Receiver") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ GatewayReceiverMXBean bean = null;
+ try {
+ bean = MBeanUtil.getGatewayReceiverMbeanProxy(senderMember);
+ } catch (Exception e) {
+ fail("Could not obtain Sender Proxy in desired time " + e);
+ }
+ assertNotNull(bean);
+ final ObjectName receiverMBeanName = service
+ .getGatewayReceiverMBeanName(senderMember);
+ try {
+ MBeanUtil.printBeanDetails(receiverMBeanName);
+ } catch (Exception e) {
+ fail("Error while Printing Bean Details " + e);
+ }
+
+ }
+ };
+ vm.invoke(checkProxySender);
+ }
+
+
+ /**
+ * stops a gateway sender
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void stopGatewaySender(final VM vm) {
+ SerializableRunnable stopGatewaySender = new SerializableRunnable("Stop Gateway Sender") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
+ assertNotNull(bean);
+ bean.stop();
+ assertFalse(bean.isRunning());
+ }
+ };
+ vm.invoke(stopGatewaySender);
+ }
+
+ /**
+ * start a gateway sender
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void startGatewaySender(final VM vm) {
+ SerializableRunnable stopGatewaySender = new SerializableRunnable("Start Gateway Sender") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
+ assertNotNull(bean);
+ bean.start();
+ assertTrue(bean.isRunning());
+ }
+ };
+ vm.invoke(stopGatewaySender);
+ }
+
+
+
+ /**
+ * Checks whether a GatewayReceiverMBean is created or not
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkReceiverMBean(final VM vm) {
+ SerializableRunnable checkMBean = new SerializableRunnable("Check Receiver MBean") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ GatewayReceiverMXBean bean = service.getLocalGatewayReceiverMXBean();
+ assertNotNull(bean);
+ }
+ };
+ vm.invoke(checkMBean);
+ }
+
+ /**
+ * Checks whether a GatewayReceiverMBean is created or not
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkSenderMBean(final VM vm, final String regionPath) {
+ SerializableRunnable checkMBean = new SerializableRunnable("Check Sender MBean") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+
+ GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn");
+ assertNotNull(bean);
+ assertTrue(bean.isConnected());
+
+ ObjectName regionBeanName = service.getRegionMBeanName(cache
+ .getDistributedSystem().getDistributedMember(), "/" + regionPath);
+ RegionMXBean rBean = service.getMBeanInstance(regionBeanName,
+ RegionMXBean.class);
+ assertTrue(rBean.isGatewayEnabled());
+
+
+ }
+ };
+ vm.invoke(checkMBean);
+ }
+
+ /**
+ * Checks whether a Async Queue MBean is created or not
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkAsyncQueueMBean(final VM vm) {
+ SerializableRunnable checkAsyncQueueMBean = new SerializableRunnable(
+ "Check Async Queue MBean") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn");
+ assertNotNull(bean);
+ // Already in started State
+ }
+ };
+ vm.invoke(checkAsyncQueueMBean);
+ }
+
+ /**
+ * Checks Proxy Async Queue
+ *
+ * @param vm
+ * reference to VM
+ */
+ @SuppressWarnings("serial")
+ protected void checkProxyAsyncQueue(final VM vm,
+ final DistributedMember senderMember) {
+ SerializableRunnable checkProxyAsyncQueue = new SerializableRunnable(
+ "Check Proxy Async Queue") {
+ public void run() {
+ Cache cache = GemFireCacheImpl.getInstance();
+ ManagementService service = ManagementService
+ .getManagementService(cache);
+ AsyncEventQueueMXBean bean = null;
+ try {
+ bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn");
+ } catch (Exception e) {
+ fail("Could not obtain Sender Proxy in desired time " + e);
+ }
+ assertNotNull(bean);
+ final ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(
+ senderMember, "pn");
+
+ try {
+ MBeanUtil.printBeanDetails(queueMBeanName);
+ } catch (Exception e) {
+ fail("Error while Printing Bean Details " + e);
+ }
+
+ }
+ };
+ vm.invoke(checkProxyAsyncQueue);
+ }
+}