You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2018/11/16 14:06:21 UTC
[geode] branch develop updated: GEODE-5971: Refactor durable client
commands to extend GfshCommand (#2858)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6371496 GEODE-5971: Refactor durable client commands to extend GfshCommand (#2858)
6371496 is described below
commit 63714964f354024591dba4c77623d4e8eeef2c8e
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Fri Nov 16 06:06:12 2018 -0800
GEODE-5971: Refactor durable client commands to extend GfshCommand (#2858)
---
.../cli/commands/CloseDurableCQsCommand.java | 50 +++----
.../cli/commands/CloseDurableClientCommand.java | 38 ++---
.../cli/commands/CountDurableCQEventsCommand.java | 54 ++++---
.../DurableClientCommandsResultBuilder.java | 164 ---------------------
.../cli/commands/ListDurableClientCQsCommand.java | 84 +++--------
.../internal/cli/domain/DurableCqNamesResult.java | 51 -------
.../internal/cli/domain/MemberResult.java | 110 --------------
.../cli/domain/SubscriptionQueueSizeResult.java | 48 ------
.../cli/functions/CloseDurableClientFunction.java | 49 +++---
.../cli/functions/CloseDurableCqFunction.java | 50 +++----
.../GetSubscriptionQueueSizeFunction.java | 101 +++++++------
.../cli/functions/ListDurableCqNamesFunction.java | 68 +++++----
.../management/internal/cli/i18n/CliStrings.java | 2 +-
.../sanctioned-geode-core-serializables.txt | 3 -
.../commands/DurableClientCommandsDUnitTest.java | 133 ++++++++++++-----
15 files changed, 318 insertions(+), 687 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableCQsCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableCQsCommand.java
index c4b5916..e1213e5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableCQsCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableCQsCommand.java
@@ -25,25 +25,24 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.MemberResult;
+import org.apache.geode.management.cli.GfshCommand;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.CloseDurableCqFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.ResultBuilder;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
-public class CloseDurableCQsCommand extends InternalGfshCommand {
- DurableClientCommandsResultBuilder builder = new DurableClientCommandsResultBuilder();
+public class CloseDurableCQsCommand extends GfshCommand {
@CliCommand(value = CliStrings.CLOSE_DURABLE_CQS, help = CliStrings.CLOSE_DURABLE_CQS__HELP)
@CliMetaData()
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.MANAGE, target = ResourcePermission.Target.QUERY)
- public Result closeDurableCqs(@CliOption(key = CliStrings.CLOSE_DURABLE_CQS__DURABLE__CLIENT__ID,
- mandatory = true,
- help = CliStrings.CLOSE_DURABLE_CQS__DURABLE__CLIENT__ID__HELP) final String durableClientId,
+ public ResultModel closeDurableCqs(
+ @CliOption(key = CliStrings.CLOSE_DURABLE_CQS__DURABLE__CLIENT__ID,
+ mandatory = true,
+ help = CliStrings.CLOSE_DURABLE_CQS__DURABLE__CLIENT__ID__HELP) final String durableClientId,
@CliOption(key = CliStrings.CLOSE_DURABLE_CQS__NAME, mandatory = true,
help = CliStrings.CLOSE_DURABLE_CQS__NAME__HELP) final String cqName,
@@ -55,29 +54,20 @@ public class CloseDurableCQsCommand extends InternalGfshCommand {
@CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS},
help = CliStrings.CLOSE_DURABLE_CQS__GROUP__HELP,
optionContext = ConverterHint.MEMBERGROUP) final String[] group) {
- Result result;
- try {
- Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
- if (targetMembers.isEmpty()) {
- return ResultBuilder.createUserErrorResult(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
- }
+ Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
- String[] params = new String[2];
- params[0] = durableClientId;
- params[1] = cqName;
-
- final ResultCollector<?, ?> rc =
- CliUtil.executeFunction(new CloseDurableCqFunction(), params, targetMembers);
- final List<MemberResult> results = (List<MemberResult>) rc.getResult();
- String failureHeader =
- CliStrings.format(CliStrings.CLOSE_DURABLE_CQS__FAILURE__HEADER, cqName, durableClientId);
- String successHeader =
- CliStrings.format(CliStrings.CLOSE_DURABLE_CQS__SUCCESS, cqName, durableClientId);
- result = builder.buildResult(results, successHeader, failureHeader);
- } catch (Exception e) {
- result = ResultBuilder.createGemFireErrorResult(e.getMessage());
+ if (targetMembers.isEmpty()) {
+ return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
}
- return result;
+
+ String[] params = new String[2];
+ params[0] = durableClientId;
+ params[1] = cqName;
+
+ final ResultCollector<?, ?> rc =
+ executeFunction(new CloseDurableCqFunction(), params, targetMembers);
+ final List<CliFunctionResult> results = (List<CliFunctionResult>) rc.getResult();
+ return ResultModel.createMemberStatusResult(results);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableClientCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableClientCommand.java
index 4174fa5..cf73062 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableClientCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CloseDurableClientCommand.java
@@ -25,24 +25,22 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.MemberResult;
+import org.apache.geode.management.cli.GfshCommand;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.CloseDurableClientFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.ResultBuilder;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
-public class CloseDurableClientCommand extends InternalGfshCommand {
- DurableClientCommandsResultBuilder builder = new DurableClientCommandsResultBuilder();
+public class CloseDurableClientCommand extends GfshCommand {
@CliCommand(value = CliStrings.CLOSE_DURABLE_CLIENTS,
help = CliStrings.CLOSE_DURABLE_CLIENTS__HELP)
@CliMetaData()
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.MANAGE, target = ResourcePermission.Target.QUERY)
- public Result closeDurableClient(
+ public ResultModel closeDurableClient(
@CliOption(key = CliStrings.CLOSE_DURABLE_CLIENTS__CLIENT__ID, mandatory = true,
help = CliStrings.CLOSE_DURABLE_CLIENTS__CLIENT__ID__HELP) final String durableClientId,
@CliOption(key = {CliStrings.MEMBER, CliStrings.MEMBERS},
@@ -52,26 +50,16 @@ public class CloseDurableClientCommand extends InternalGfshCommand {
help = CliStrings.COUNT_DURABLE_CQ_EVENTS__GROUP__HELP,
optionContext = ConverterHint.MEMBERGROUP) final String[] group) {
- Result result;
- try {
+ Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
- Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
+ if (targetMembers.isEmpty()) {
+ return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+ }
- if (targetMembers.isEmpty()) {
- return ResultBuilder.createUserErrorResult(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
- }
+ final ResultCollector<?, ?> rc =
+ executeFunction(new CloseDurableClientFunction(), durableClientId, targetMembers);
+ final List<CliFunctionResult> results = (List<CliFunctionResult>) rc.getResult();
- final ResultCollector<?, ?> rc =
- CliUtil.executeFunction(new CloseDurableClientFunction(), durableClientId, targetMembers);
- final List<MemberResult> results = (List<MemberResult>) rc.getResult();
- String failureHeader =
- CliStrings.format(CliStrings.CLOSE_DURABLE_CLIENTS__FAILURE__HEADER, durableClientId);
- String successHeader =
- CliStrings.format(CliStrings.CLOSE_DURABLE_CLIENTS__SUCCESS, durableClientId);
- result = builder.buildResult(results, successHeader, failureHeader);
- } catch (Exception e) {
- result = ResultBuilder.createGemFireErrorResult(e.getMessage());
- }
- return result;
+ return ResultModel.createMemberStatusResult(results);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CountDurableCQEventsCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CountDurableCQEventsCommand.java
index 420b256..ac55c3b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CountDurableCQEventsCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CountDurableCQEventsCommand.java
@@ -25,24 +25,24 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.SubscriptionQueueSizeResult;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.GetSubscriptionQueueSizeFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.ResultBuilder;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.cli.result.model.TabularResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
-public class CountDurableCQEventsCommand extends InternalGfshCommand {
- DurableClientCommandsResultBuilder builder = new DurableClientCommandsResultBuilder();
+public class CountDurableCQEventsCommand extends GfshCommand {
@CliCommand(value = CliStrings.COUNT_DURABLE_CQ_EVENTS,
help = CliStrings.COUNT_DURABLE_CQ_EVENTS__HELP)
@CliMetaData()
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.READ)
- public Result countDurableCqEvents(
+ public ResultModel countDurableCqEvents(
@CliOption(key = CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CLIENT__ID, mandatory = true,
help = CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CLIENT__ID__HELP) final String durableClientId,
@CliOption(key = CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CQ__NAME,
@@ -54,35 +54,31 @@ public class CountDurableCQEventsCommand extends InternalGfshCommand {
help = CliStrings.COUNT_DURABLE_CQ_EVENTS__GROUP__HELP,
optionContext = ConverterHint.MEMBERGROUP) final String[] group) {
- Result result;
- try {
- Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
+ Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
- if (targetMembers.isEmpty()) {
- return ResultBuilder.createUserErrorResult(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
- }
+ if (targetMembers.isEmpty()) {
+ return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+ }
- String[] params = new String[2];
- params[0] = durableClientId;
- params[1] = cqName;
- final ResultCollector<?, ?> rc =
- CliUtil.executeFunction(new GetSubscriptionQueueSizeFunction(), params, targetMembers);
- final List<SubscriptionQueueSizeResult> funcResults =
- (List<SubscriptionQueueSizeResult>) rc.getResult();
+ String[] params = new String[2];
+ params[0] = durableClientId;
+ params[1] = cqName;
+ final ResultCollector<?, ?> rc =
+ executeFunction(new GetSubscriptionQueueSizeFunction(), params, targetMembers);
+ final List<CliFunctionResult> funcResults = (List<CliFunctionResult>) rc.getResult();
- String queueSizeColumnName;
+ ResultModel result = new ResultModel();
+ TabularResultModel table = result.addTable("subscription-queue-size");
+ for (CliFunctionResult oneResult : funcResults) {
+ table.accumulate("Member", oneResult.getMemberIdOrName());
+ table.accumulate("Status", oneResult.getStatus());
+ table.accumulate("Queue Size", oneResult.getStatusMessage());
- if (cqName != null && !cqName.isEmpty()) {
- queueSizeColumnName = CliStrings
- .format(CliStrings.COUNT_DURABLE_CQ_EVENTS__SUBSCRIPTION__QUEUE__SIZE__CLIENT, cqName);
- } else {
- queueSizeColumnName = CliStrings.format(
- CliStrings.COUNT_DURABLE_CQ_EVENTS__SUBSCRIPTION__QUEUE__SIZE__CLIENT, durableClientId);
+ if (!oneResult.isSuccessful()) {
+ result.setStatus(Result.Status.ERROR);
}
- result = builder.buildTableResultForQueueSize(funcResults, queueSizeColumnName);
- } catch (Exception e) {
- result = ResultBuilder.createGemFireErrorResult(e.getMessage());
}
return result;
}
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsResultBuilder.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsResultBuilder.java
deleted file mode 100644
index 743602a..0000000
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsResultBuilder.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.management.internal.cli.commands;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.domain.MemberResult;
-import org.apache.geode.management.internal.cli.domain.SubscriptionQueueSizeResult;
-import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.ErrorResultData;
-import org.apache.geode.management.internal.cli.result.InfoResultData;
-import org.apache.geode.management.internal.cli.result.ResultBuilder;
-import org.apache.geode.management.internal.cli.result.TabularResultData;
-
-public class DurableClientCommandsResultBuilder {
- public Result buildResult(List<MemberResult> results, String successHeader,
- String failureHeader) {
- Result result;
- boolean failure = true;
- boolean partialFailure = false;
- Map<String, List<String>> errorMap = new HashMap<>();
- Map<String, List<String>> successMap = new HashMap<>();
- Map<String, List<String>> exceptionMap = new HashMap<>();
-
- // Aggregate the results from the members
- for (MemberResult memberResult : results) {
- if (memberResult.isSuccessful()) {
- failure = false;
- groupByMessage(memberResult.getSuccessMessage(), memberResult.getMemberNameOrId(),
- successMap);
- } else {
- if (memberResult.isOpPossible()) {
- partialFailure = true;
- groupByMessage(memberResult.getExceptionMessage(), memberResult.getMemberNameOrId(),
- exceptionMap);
- } else {
- groupByMessage(memberResult.getErrorMessage(), memberResult.getMemberNameOrId(),
- errorMap);
- }
- }
- }
-
- if (!failure && !partialFailure) {
- result = ResultBuilder.buildResult(buildSuccessData(successMap));
- } else {
- result = ResultBuilder
- .buildResult(buildFailureData(successMap, exceptionMap, errorMap, failureHeader));
- }
- return result;
- }
-
- Result buildTableResultForQueueSize(List<SubscriptionQueueSizeResult> results,
- String queueSizeColumnName) {
- Result result;
- boolean failure = true;
-
- Map<String, List<String>> failureMap = new HashMap<>();
- Map<String, Long> memberQueueSizeTable = new TreeMap<>();
-
- // Aggregate the results from the members
- for (SubscriptionQueueSizeResult memberResult : results) {
- if (memberResult.isSuccessful()) {
- failure = false;
- memberQueueSizeTable.put(memberResult.getMemberNameOrId(),
- memberResult.getSubscriptionQueueSize());
- } else {
- groupByMessage(memberResult.getErrorMessage(), memberResult.getMemberNameOrId(),
- failureMap);
- }
- }
-
- if (!failure) {
- TabularResultData table = ResultBuilder.createTabularResultData();
- Set<String> members = memberQueueSizeTable.keySet();
-
- for (String member : members) {
- long queueSize = memberQueueSizeTable.get(member);
- table.accumulate(CliStrings.MEMBER, member);
- table.accumulate(queueSizeColumnName, queueSize);
- }
- result = ResultBuilder.buildResult(table);
- } else {
- ErrorResultData erd = ResultBuilder.createErrorResultData();
- buildErrorResult(erd, failureMap);
- result = ResultBuilder.buildResult(erd);
- }
- return result;
- }
-
- void groupByMessage(String message, String memberNameOrId, Map<String, List<String>> map) {
- List<String> members = map.get(message);
- if (members == null) {
- members = new LinkedList<>();
- }
- members.add(memberNameOrId);
- map.put(message, members);
- }
-
-
- private InfoResultData buildSuccessData(Map<String, List<String>> successMap) {
- InfoResultData ird = ResultBuilder.createInfoResultData();
- Set<String> successMessages = successMap.keySet();
-
- for (String successMessage : successMessages) {
- ird.addLine(CliStrings.format(CliStrings.ACTION_SUCCEEDED_ON_MEMBER, successMessage));
- List<String> successfulMembers = successMap.get(successMessage);
- int num = 0;
- for (String member : successfulMembers) {
- ird.addLine("" + ++num + "." + member);
- }
- ird.addLine("\n");
- }
- return ird;
- }
-
- ErrorResultData buildFailureData(Map<String, List<String>> successMap,
- Map<String, List<String>> exceptionMap, Map<String, List<String>> errorMap,
- String errorHeader) {
- ErrorResultData erd = ResultBuilder.createErrorResultData();
- buildErrorResult(erd, successMap);
- erd.addLine("\n");
- erd.addLine(errorHeader);
- buildErrorResult(erd, exceptionMap);
- buildErrorResult(erd, errorMap);
- return erd;
- }
-
- private void buildErrorResult(ErrorResultData erd, Map<String, List<String>> resultMap) {
- if (resultMap != null && !resultMap.isEmpty()) {
- Set<String> messages = resultMap.keySet();
-
- for (String message : messages) {
- erd.addLine("\n");
- erd.addLine(message);
- erd.addLine(CliStrings.OCCURRED_ON_MEMBERS);
- List<String> members = resultMap.get(message);
- int num = 0;
- for (String member : members) {
- ++num;
- erd.addLine("" + num + "." + member);
- }
- }
- }
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ListDurableClientCQsCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ListDurableClientCQsCommand.java
index 66c748d..74ad720 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ListDurableClientCQsCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ListDurableClientCQsCommand.java
@@ -15,11 +15,8 @@
package org.apache.geode.management.internal.cli.commands;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
@@ -28,24 +25,23 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.DurableCqNamesResult;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.ListDurableCqNamesFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.ResultBuilder;
-import org.apache.geode.management.internal.cli.result.TabularResultData;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.cli.result.model.TabularResultModel;
import org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
-public class ListDurableClientCQsCommand extends InternalGfshCommand {
- DurableClientCommandsResultBuilder builder = new DurableClientCommandsResultBuilder();
+public class ListDurableClientCQsCommand extends GfshCommand {
@CliCommand(value = CliStrings.LIST_DURABLE_CQS, help = CliStrings.LIST_DURABLE_CQS__HELP)
@CliMetaData()
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
operation = ResourcePermission.Operation.READ)
- public Result listDurableClientCQs(
+ public ResultModel listDurableClientCQs(
@CliOption(key = CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID, mandatory = true,
help = CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID__HELP) final String durableClientId,
@@ -56,66 +52,32 @@ public class ListDurableClientCQsCommand extends InternalGfshCommand {
@CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS},
help = CliStrings.LIST_DURABLE_CQS__GROUP__HELP,
optionContext = ConverterHint.MEMBERGROUP) final String[] group) {
- Result result;
- try {
- boolean noResults = true;
- Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
+ Set<DistributedMember> targetMembers = findMembers(group, memberNameOrId);
- if (targetMembers.isEmpty()) {
- return ResultBuilder.createUserErrorResult(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
- }
+ if (targetMembers.isEmpty()) {
+ return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+ }
- final ResultCollector<?, ?> rc =
- CliUtil.executeFunction(new ListDurableCqNamesFunction(), durableClientId, targetMembers);
- final List<DurableCqNamesResult> results = (List<DurableCqNamesResult>) rc.getResult();
- Map<String, List<String>> memberCqNamesMap = new TreeMap<>();
- Map<String, List<String>> errorMessageNodes = new HashMap<>();
- Map<String, List<String>> exceptionMessageNodes = new HashMap<>();
+ final ResultCollector<?, ?> rc =
+ executeFunction(new ListDurableCqNamesFunction(), durableClientId, targetMembers);
+ final List<List<CliFunctionResult>> results = (List<List<CliFunctionResult>>) rc.getResult();
- for (DurableCqNamesResult memberResult : results) {
- if (memberResult != null) {
- if (memberResult.isSuccessful()) {
- memberCqNamesMap.put(memberResult.getMemberNameOrId(), memberResult.getCqNamesList());
- } else {
- if (memberResult.isOpPossible()) {
- builder.groupByMessage(memberResult.getExceptionMessage(),
- memberResult.getMemberNameOrId(), exceptionMessageNodes);
- } else {
- builder.groupByMessage(memberResult.getErrorMessage(),
- memberResult.getMemberNameOrId(), errorMessageNodes);
- }
- }
- }
- }
+ ResultModel result = new ResultModel();
+ TabularResultModel table = result.addTable("list-durable-client-cqs");
- if (!memberCqNamesMap.isEmpty()) {
- TabularResultData table = ResultBuilder.createTabularResultData();
- Set<String> members = memberCqNamesMap.keySet();
+ for (List<CliFunctionResult> perMemberList : results) {
+ for (CliFunctionResult oneResult : perMemberList) {
+ table.accumulate("Member", oneResult.getMemberIdOrName());
+ table.accumulate("Status", oneResult.getStatus());
+ table.accumulate("CQ Name", oneResult.getStatusMessage());
- for (String member : members) {
- boolean isFirst = true;
- List<String> cqNames = memberCqNamesMap.get(member);
- for (String cqName : cqNames) {
- if (isFirst) {
- isFirst = false;
- table.accumulate(CliStrings.MEMBER, member);
- } else {
- table.accumulate(CliStrings.MEMBER, "");
- }
- table.accumulate(CliStrings.LIST_DURABLE_CQS__NAME, cqName);
- }
+ if (!oneResult.isSuccessful()) {
+ result.setStatus(Result.Status.ERROR);
}
- result = ResultBuilder.buildResult(table);
- } else {
- String errorHeader =
- CliStrings.format(CliStrings.LIST_DURABLE_CQS__FAILURE__HEADER, durableClientId);
- result = ResultBuilder.buildResult(
- builder.buildFailureData(null, exceptionMessageNodes, errorMessageNodes, errorHeader));
}
- } catch (Exception e) {
- result = ResultBuilder.createGemFireErrorResult(e.getMessage());
}
+
return result;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/DurableCqNamesResult.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/DurableCqNamesResult.java
deleted file mode 100644
index 7cfeed8..0000000
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/DurableCqNamesResult.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal.cli.domain;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/****
- * Data class used for sending back names for the durable client cq for a client
- */
-public class DurableCqNamesResult extends MemberResult implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private List<String> cqNames = new ArrayList<String>();
-
- public DurableCqNamesResult(final String memberNameOrId) {
- super(memberNameOrId);
- }
-
- public List<String> getCqNamesList() {
- return cqNames;
- }
-
- public void setCqNamesList(List<String> cqNamesList) {
- this.cqNames.addAll(cqNamesList);
- this.isSuccessful = true;
- this.opPossible = true;
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append(super.toString());
- for (String cqName : cqNames) {
- sb.append("\nCqName : " + cqName);
- }
- return sb.toString();
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/MemberResult.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/MemberResult.java
deleted file mode 100644
index 0ab2706..0000000
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/MemberResult.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal.cli.domain;
-
-import java.io.Serializable;
-
-/***
- * Data class used to return the result of a function on a member. Typically to return the status of
- * an action on a member. Not suitable if you wish to return specific data from member
- *
- */
-public class MemberResult implements Serializable {
-
- private static final long serialVersionUID = 1L;
- protected String memberNameOrId;
- protected boolean isSuccessful;
- protected boolean opPossible;
- protected String exceptionMessage;
- protected String successMessage;
- protected String errorMessage;
-
-
- public MemberResult(String memberNameOrId) {
- this.memberNameOrId = memberNameOrId;
- this.isSuccessful = true;
- this.opPossible = true;
- }
-
- public MemberResult(String memberNameOrId, String errorMessage) {
- this.memberNameOrId = memberNameOrId;
- this.errorMessage = errorMessage;
- this.opPossible = false;
- this.isSuccessful = false;
- }
-
- public String getMemberNameOrId() {
- return memberNameOrId;
- }
-
- public void setMemberNameOrId(String memberNameOrId) {
- this.memberNameOrId = memberNameOrId;
- }
-
- public boolean isSuccessful() {
- return isSuccessful;
- }
-
- public void setSuccessful(boolean isSuccessful) {
- this.isSuccessful = isSuccessful;
- }
-
- public String getExceptionMessage() {
- return exceptionMessage;
- }
-
- public void setExceptionMessage(String exceptionMessage) {
- this.exceptionMessage = exceptionMessage;
- this.opPossible = true;
- this.isSuccessful = false;
- }
-
- public boolean isOpPossible() {
- return this.opPossible;
- }
-
- public void setErrorMessage(String errorMessage) {
- this.errorMessage = errorMessage;
- this.opPossible = false;
- this.isSuccessful = false;
- }
-
- public String getErrorMessage() {
- return this.errorMessage;
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("MemberNameOrId : ");
- sb.append(this.memberNameOrId);
- sb.append("\nSuccessfull : ");
- sb.append(this.isSuccessful);
- sb.append("\n isOpPossible");
- sb.append(this.isOpPossible());
- sb.append("Success Message : ");
- sb.append(this.successMessage);
- sb.append("\nException Message : ");
- sb.append(this.exceptionMessage);
- return sb.toString();
- }
-
- public String getSuccessMessage() {
- return successMessage;
- }
-
- public void setSuccessMessage(String successMessage) {
- this.successMessage = successMessage;
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/SubscriptionQueueSizeResult.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/SubscriptionQueueSizeResult.java
deleted file mode 100644
index 5f2b8d9..0000000
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/domain/SubscriptionQueueSizeResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal.cli.domain;
-
-/***
- * Data class used for sending back subscription-queue-size for a client or a cq
- *
- */
-public class SubscriptionQueueSizeResult extends MemberResult {
-
- private static final long serialVersionUID = 1L;
- private long subscriptionQueueSize;
-
- public SubscriptionQueueSizeResult(String memberNameOrId) {
- super(memberNameOrId);
- }
-
- public long getSubscriptionQueueSize() {
- return subscriptionQueueSize;
- }
-
- public void setSubscriptionQueueSize(long queueSize) {
- this.subscriptionQueueSize = queueSize;
- super.isSuccessful = true;
- super.opPossible = true;
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append(super.toString());
- sb.append("\nsubscription-queue-size : ");
- sb.append(this.subscriptionQueueSize);
- return sb.toString();
- }
-
-}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableClientFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableClientFunction.java
index 358c0fd..5302442 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableClientFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableClientFunction.java
@@ -20,7 +20,6 @@ import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.MemberResult;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
/***
@@ -37,39 +36,37 @@ public class CloseDurableClientFunction implements InternalFunction {
final Cache cache = context.getCache();
final String memberNameOrId =
CliUtil.getMemberNameOrId(cache.getDistributedSystem().getDistributedMember());
- MemberResult memberResult = new MemberResult(memberNameOrId);
+ context.getResultSender().lastResult(createFunctionResult(memberNameOrId, durableClientId));
+ }
+
+ private CliFunctionResult createFunctionResult(String memberNameOrId, String durableClientId) {
try {
CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
- if (cacheClientNotifier != null) {
- CacheClientProxy ccp = cacheClientNotifier.getClientProxy(durableClientId);
- if (ccp != null) {
- boolean isClosed = cacheClientNotifier.closeDurableClientProxy(durableClientId);
- if (isClosed) {
- memberResult.setSuccessMessage(
- CliStrings.format(CliStrings.CLOSE_DURABLE_CLIENTS__SUCCESS, durableClientId));
- } else {
- memberResult.setErrorMessage(
- CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
- }
- } else {
- memberResult.setErrorMessage(
- CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
- }
+ if (cacheClientNotifier == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.NO_CLIENT_FOUND);
+ }
+
+ CacheClientProxy ccp = cacheClientNotifier.getClientProxy(durableClientId);
+ if (ccp == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
+ }
+
+ boolean isClosed = cacheClientNotifier.closeDurableClientProxy(durableClientId);
+ if (isClosed) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.OK,
+ CliStrings.format(CliStrings.CLOSE_DURABLE_CLIENTS__SUCCESS, durableClientId));
} else {
- memberResult.setErrorMessage(CliStrings.NO_CLIENT_FOUND);
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
}
} catch (Exception e) {
- memberResult.setExceptionMessage(e.getMessage());
- } finally {
- context.getResultSender().lastResult(memberResult);
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ e.getMessage());
}
}
- @Override
- public String getId() {
- return CloseDurableClientFunction.class.getName();
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableCqFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableCqFunction.java
index b736ebd..85ed673 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableCqFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CloseDurableCqFunction.java
@@ -20,7 +20,6 @@ import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.MemberResult;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
/***
@@ -37,41 +36,40 @@ public class CloseDurableCqFunction implements InternalFunction {
final Cache cache = context.getCache();
final String memberNameOrId =
CliUtil.getMemberNameOrId(cache.getDistributedSystem().getDistributedMember());
- CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
String[] args = (String[]) context.getArguments();
String durableClientId = args[0];
String cqName = args[1];
- MemberResult memberResult = new MemberResult(memberNameOrId);
+ context.getResultSender()
+ .lastResult(createFunctionResult(memberNameOrId, durableClientId, cqName));
+ }
+
+ private CliFunctionResult createFunctionResult(String memberNameOrId, String durableClientId,
+ String cqName) {
+ CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
try {
- if (cacheClientNotifier != null) {
- CacheClientProxy cacheClientProxy = cacheClientNotifier.getClientProxy(durableClientId);
- if (cacheClientProxy != null) {
- if (cacheClientNotifier.closeClientCq(durableClientId, cqName)) {
- memberResult.setSuccessMessage(
- CliStrings.format(CliStrings.CLOSE_DURABLE_CQS__SUCCESS, cqName, durableClientId));
- } else {
- memberResult.setErrorMessage(CliStrings.format(
- CliStrings.CLOSE_DURABLE_CQS__UNABLE__TO__CLOSE__CQ, cqName, durableClientId));
- }
+ if (cacheClientNotifier == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.NO_CLIENT_FOUND);
+ }
- } else {
- memberResult.setErrorMessage(
- CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
- }
+ CacheClientProxy cacheClientProxy = cacheClientNotifier.getClientProxy(durableClientId);
+ if (cacheClientProxy == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
+ }
+
+ if (cacheClientNotifier.closeClientCq(durableClientId, cqName)) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.OK,
+ CliStrings.format(CliStrings.CLOSE_DURABLE_CQS__SUCCESS, cqName, durableClientId));
} else {
- memberResult.setErrorMessage(CliStrings.NO_CLIENT_FOUND);
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.CLOSE_DURABLE_CQS__UNABLE__TO__CLOSE__CQ, cqName,
+ durableClientId));
}
} catch (Exception e) {
- memberResult.setExceptionMessage(e.getMessage());
- } finally {
- context.getResultSender().lastResult(memberResult);
+ return new CliFunctionResult(memberNameOrId, e);
}
}
- @Override
- public String getId() {
- return CloseDurableCqFunction.class.getName();
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GetSubscriptionQueueSizeFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GetSubscriptionQueueSizeFunction.java
index b17a813..4c7db2e 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GetSubscriptionQueueSizeFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/GetSubscriptionQueueSizeFunction.java
@@ -24,7 +24,6 @@ import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.SubscriptionQueueSizeResult;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
/***
@@ -41,64 +40,64 @@ public class GetSubscriptionQueueSizeFunction implements InternalFunction {
final String memberNameOrId =
CliUtil.getMemberNameOrId(cache.getDistributedSystem().getDistributedMember());
String args[] = (String[]) context.getArguments();
- String durableClientId = null, cqName = null;
- SubscriptionQueueSizeResult result = new SubscriptionQueueSizeResult(memberNameOrId);
+ String durableClientId = args[0];
+ String cqName = args[1];
- durableClientId = args[0];
- cqName = args[1];
+ context.getResultSender()
+ .lastResult(createFunctionResult(memberNameOrId, durableClientId, cqName));
+ }
+
+ private CliFunctionResult createFunctionResult(String memberNameOrId, String durableClientId,
+ String cqName) {
try {
CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
- if (cacheClientNotifier != null) {
- CacheClientProxy cacheClientProxy = cacheClientNotifier.getClientProxy(durableClientId);
- // Check if the client is present or not
- if (cacheClientProxy != null) {
- if (cqName != null && !cqName.isEmpty()) {
- CqService cqService = cacheClientProxy.getCache().getCqService();
- if (cqService != null) {
- CqQuery cqQuery =
- cqService.getClientCqFromServer(cacheClientProxy.getProxyID(), cqName);
- if (cqQuery != null) {
- CqQueryVsdStats cqVsdStats = ((InternalCqQuery) cqQuery).getVsdStats();
+ if (cacheClientNotifier == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.NO_CLIENT_FOUND);
+ }
+
+ CacheClientProxy cacheClientProxy = cacheClientNotifier.getClientProxy(durableClientId);
+ // Check if the client is present or not
+ if (cacheClientProxy == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
+ }
+
+ if (cqName == null || cqName.isEmpty()) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.OK,
+ Integer.toString(cacheClientNotifier.getDurableClientHAQueueSize(durableClientId)));
+ }
+
+ CqService cqService = cacheClientProxy.getCache().getCqService();
+ if (cqService == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.COUNT_DURABLE_CQ_EVENTS__NO__CQS__REGISTERED);
+ }
- if (cqVsdStats != null) {
- long queueSize = cqVsdStats.getNumHAQueuedEvents();
- result.setSubscriptionQueueSize(queueSize);
- } else {
- result.setErrorMessage(CliStrings.format(
- CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE_CQ_STATS_NOT_FOUND,
- durableClientId, cqName));
- }
- } else {
- result.setErrorMessage(
- CliStrings.format(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE_CQ_NOT_FOUND,
- durableClientId, cqName));
- }
- } else {
- result.setErrorMessage(CliStrings.COUNT_DURABLE_CQ_EVENTS__NO__CQS__REGISTERED);
- }
- } else {
- result.setSubscriptionQueueSize(
- cacheClientNotifier.getDurableClientHAQueueSize(durableClientId));
- }
- } else {
- result.setErrorMessage(
- CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
- }
- } else {
- result.setErrorMessage(CliStrings.NO_CLIENT_FOUND);
+ CqQuery cqQuery = cqService.getClientCqFromServer(cacheClientProxy.getProxyID(), cqName);
+ if (cqQuery == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE_CQ_NOT_FOUND,
+ durableClientId, cqName));
}
+
+ CqQueryVsdStats cqVsdStats = ((InternalCqQuery) cqQuery).getVsdStats();
+
+ if (cqVsdStats == null) {
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE_CQ_STATS_NOT_FOUND,
+ durableClientId, cqName));
+ }
+
+ long queueSize = cqVsdStats.getNumHAQueuedEvents();
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.OK,
+ Long.toString(queueSize));
} catch (Exception e) {
- result.setExceptionMessage(e.getMessage());
- } finally {
- context.getResultSender().lastResult(result);
+ return new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ e.getMessage());
}
}
-
- @Override
- public String getId() {
- return GetSubscriptionQueueSizeFunction.class.getName();
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ListDurableCqNamesFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ListDurableCqNamesFunction.java
index 0d02ccd..8d37f7b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ListDurableCqNamesFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ListDurableCqNamesFunction.java
@@ -26,7 +26,6 @@ import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.management.internal.cli.CliUtil;
-import org.apache.geode.management.internal.cli.domain.DurableCqNamesResult;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
/**
@@ -54,38 +53,55 @@ public class ListDurableCqNamesFunction implements InternalFunction {
final Cache cache = context.getCache();
final DistributedMember member = cache.getDistributedSystem().getDistributedMember();
String memberNameOrId = CliUtil.getMemberNameOrId(member);
- DurableCqNamesResult result = new DurableCqNamesResult(memberNameOrId);
String durableClientId = (String) context.getArguments();
+ context.getResultSender().lastResult(createFunctionResult(memberNameOrId, durableClientId));
+ }
+
+ private List<CliFunctionResult> createFunctionResult(String memberNameOrId,
+ String durableClientId) {
+ List<CliFunctionResult> results = new ArrayList<>();
+
try {
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- if (ccn != null) {
- CacheClientProxy ccp = ccn.getClientProxy(durableClientId);
- if (ccp != null) {
- CqService cqService = ccp.getCache().getCqService();
- if (cqService != null && cqService.isRunning()) {
- List<String> durableCqNames = cqService.getAllDurableClientCqs(ccp.getProxyID());
- if (durableCqNames != null && !durableCqNames.isEmpty()) {
- result.setCqNamesList(new ArrayList<String>(durableCqNames));
- } else {
- result.setErrorMessage(CliStrings
- .format(CliStrings.LIST_DURABLE_CQS__NO__CQS__FOR__CLIENT, durableClientId));
- }
- } else {
- result.setErrorMessage(CliStrings.LIST_DURABLE_CQS__NO__CQS__REGISTERED);
- }
- } else {
- result.setErrorMessage(
- CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId));
- }
- } else {
- result.setErrorMessage(CliStrings.NO_CLIENT_FOUND);
+ if (ccn == null) {
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.NO_CLIENT_FOUND));
+ return results;
+ }
+
+ CacheClientProxy ccp = ccn.getClientProxy(durableClientId);
+ if (ccp == null) {
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, durableClientId)));
+ return results;
+ }
+
+ CqService cqService = ccp.getCache().getCqService();
+ if (cqService == null || !cqService.isRunning()) {
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings.LIST_DURABLE_CQS__NO__CQS__REGISTERED));
+ return results;
+ }
+
+ List<String> durableCqNames = cqService.getAllDurableClientCqs(ccp.getProxyID());
+ if (durableCqNames == null || durableCqNames.isEmpty()) {
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ CliStrings
+ .format(CliStrings.LIST_DURABLE_CQS__NO__CQS__FOR__CLIENT, durableClientId)));
+ return results;
+ }
+
+ for (String cqName : durableCqNames) {
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.OK,
+ cqName));
}
+ return results;
} catch (Exception e) {
- result.setExceptionMessage(e.getMessage());
- } finally {
- context.getResultSender().lastResult(result);
+ results.add(new CliFunctionResult(memberNameOrId, CliFunctionResult.StatusState.ERROR,
+ e.getMessage()));
+ return results;
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index 0ec8230..a7cbd16 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -2979,7 +2979,7 @@ public class CliStrings {
public static final String LIST_DURABLE_CQS__EXCEPTION__OCCURRED__ON =
"Members with exceptions while retrieving durable cqs.";
public static final String LIST_DURABLE_CQS__NO__CQS__REGISTERED =
- "No durable cq's registered on this member.";
+ "No durable cqs registered on this member.";
public static final String LIST_DURABLE_CQS__NAME = "durable-cq-name";
public static final String LIST_DURABLE_CQS__FAILURE__HEADER =
"Unable to list durable-cqs for durable-client-id : \"{0}\" due to following reasons.";
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 4cc865d..e73b202 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -508,21 +508,18 @@ org/apache/geode/management/internal/cli/domain/DiskStoreDetails$CacheServerDeta
org/apache/geode/management/internal/cli/domain/DiskStoreDetails$DiskDirDetails,false,absolutePath:java/lang/String,size:int
org/apache/geode/management/internal/cli/domain/DiskStoreDetails$GatewayDetails,false,id:java/lang/String,persistent:boolean
org/apache/geode/management/internal/cli/domain/DiskStoreDetails$RegionDetails,false,fullPath:java/lang/String,name:java/lang/String,overflowToDisk:boolean,persistent:boolean
-org/apache/geode/management/internal/cli/domain/DurableCqNamesResult,true,1,cqNames:java/util/List
org/apache/geode/management/internal/cli/domain/EvictionAttributesInfo,true,1,evictionAction:java/lang/String,evictionAlgorithm:java/lang/String,evictionMaxValue:int,nonDefaultAttributes:java/util/Map
org/apache/geode/management/internal/cli/domain/FixedPartitionAttributesInfo,true,1,isPrimary:boolean,numBuckets:int,partitionName:java/lang/String
org/apache/geode/management/internal/cli/domain/IndexDetails,true,-2198907141534201288,fromClause:java/lang/String,indexName:java/lang/String,indexStatisticsDetails:org/apache/geode/management/internal/cli/domain/IndexDetails$IndexStatisticsDetails,indexType:org/apache/geode/cache/query/IndexType,indexedExpression:java/lang/String,isValid:boolean,memberId:java/lang/String,memberName:java/lang/String,projectionAttributes:java/lang/String,regionName:java/lang/String,regionPath:java/lang/String
org/apache/geode/management/internal/cli/domain/IndexDetails$IndexStatisticsDetails,false,numberOfKeys:java/lang/Long,numberOfUpdates:java/lang/Long,numberOfValues:java/lang/Long,totalUpdateTime:java/lang/Long,totalUses:java/lang/Long
org/apache/geode/management/internal/cli/domain/MemberConfigurationInfo,false,cacheAttributes:java/util/Map,cacheServerAttributes:java/util/List,gfePropsRuntime:java/util/Map,gfePropsSetFromFile:java/util/Map,gfePropsSetUsingApi:java/util/Map,gfePropsSetWithDefaults:java/util/Map,jvmInputArguments:java/util/List,pdxAttributes:java/util/Map,systemProperties:java/util/Properties
org/apache/geode/management/internal/cli/domain/MemberInformation,true,1,cacheServerList:java/util/List,cacheXmlFilePath:java/lang/String,clientCount:int,cpuUsage:double,groups:java/lang/String,heapUsage:java/lang/String,host:java/lang/String,hostedRegions:java/util/Set,id:java/lang/String,initHeapSize:java/lang/String,isServer:boolean,locatorBindAddress:java/lang/String,locatorPort:int,locators:java/lang/String,logFilePath:java/lang/String,maxHeapSize:java/lang/String,name:java/lang/Str [...]
-org/apache/geode/management/internal/cli/domain/MemberResult,true,1,errorMessage:java/lang/String,exceptionMessage:java/lang/String,isSuccessful:boolean,memberNameOrId:java/lang/String,opPossible:boolean,successMessage:java/lang/String
org/apache/geode/management/internal/cli/domain/PartitionAttributesInfo,true,1,colocatedWith:java/lang/String,fpaInfoList:java/util/List,localMaxMemory:int,nonDefaultAttributes:java/util/Map,partitionResolverName:java/lang/String,recoveryDelay:long,redundantCopies:int,startupRecoveryDelay:long,totalNumBuckets:int
org/apache/geode/management/internal/cli/domain/RegionAttributesInfo,true,336184564012988487,asyncEventQueueIDs:java/util/Set,cacheListenerClassNames:java/util/List,cacheLoaderClassName:java/lang/String,cacheWriterClassName:java/lang/String,cloningEnabled:boolean,compressorClassName:java/lang/String,concurrencyChecksEnabled:boolean,concurrencyLevel:int,customExpiryIdleTimeoutClass:java/lang/String,customExpiryTTLClass:java/lang/String,dataPolicy:org/apache/geode/cache/DataPolicy,diskStor [...]
org/apache/geode/management/internal/cli/domain/RegionDescription,true,6461449275798378332,cndEvictionAttributes:java/util/Map,cndPartitionAttributes:java/util/Map,cndRegionAttributes:java/util/Map,dataPolicy:org/apache/geode/cache/DataPolicy,isAccessor:boolean,isLocal:boolean,isPartition:boolean,isPersistent:boolean,isReplicate:boolean,name:java/lang/String,regionDescPerMemberMap:java/util/Map,scope:org/apache/geode/cache/Scope
org/apache/geode/management/internal/cli/domain/RegionDescriptionPerMember,true,1,hostingMember:java/lang/String,isAccessor:boolean,name:java/lang/String,regionAttributesInfo:org/apache/geode/management/internal/cli/domain/RegionAttributesInfo,size:int
org/apache/geode/management/internal/cli/domain/RegionInformation,true,1,dataPolicy:org/apache/geode/cache/DataPolicy,isRoot:boolean,name:java/lang/String,parentRegion:java/lang/String,path:java/lang/String,scope:org/apache/geode/cache/Scope,subRegionInformationSet:java/util/Set
org/apache/geode/management/internal/cli/domain/StackTracesPerMember,true,1,memberNameOrId:java/lang/String,stackTraces:byte[]
-org/apache/geode/management/internal/cli/domain/SubscriptionQueueSizeResult,true,1,subscriptionQueueSize:long
org/apache/geode/management/internal/cli/exceptions/EntityExistsException,false
org/apache/geode/management/internal/cli/exceptions/EntityNotFoundException,false,statusOK:boolean
org/apache/geode/management/internal/cli/functions/AlterRuntimeConfigFunction,true,1
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
index 73664a8..a06c97a 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
@@ -16,9 +16,9 @@ package org.apache.geode.management.internal.cli.commands;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
import java.util.Properties;
@@ -30,17 +30,20 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.data.Portfolio;
-import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.cli.Result;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.CommandResult;
import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
@@ -55,13 +58,15 @@ import org.apache.geode.test.junit.rules.GfshCommandRule;
@SuppressWarnings("serial")
public class DurableClientCommandsDUnitTest {
- private static final String REGION_NAME = "stocks";
+ private static final String STOCKS_REGION = "stocks";
+ private static final String BONDS_REGION = "bonds";
private static final String CQ1 = "cq1";
private static final String CQ2 = "cq2";
private static final String CQ3 = "cq3";
private static final String CLIENT_NAME = "dc1";
+ private static final String CQ_GROUP = "cq-group";
- private MemberVM locator, server;
+ private MemberVM locator;
private ClientVM client;
@@ -73,19 +78,19 @@ public class DurableClientCommandsDUnitTest {
@Before
public void setup() throws Exception {
- int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
- Properties props = new Properties();
- props.setProperty(LOG_LEVEL, "fine");
- props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS, "localhost");
- props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" + jmxPort);
- locator = lsRule.startLocatorVM(0, props);
+ locator = lsRule.startLocatorVM(0);
int locatorPort = locator.getPort();
- server = lsRule.startServerVM(1,
- thisServer -> thisServer.withRegion(RegionShortcut.REPLICATE, REGION_NAME)
+ lsRule.startServerVM(1,
+ thisServer -> thisServer.withRegion(RegionShortcut.REPLICATE, STOCKS_REGION)
+ .withProperty("groups", CQ_GROUP)
.withConnectionToLocator(locatorPort));
- client = lsRule.startClientVM(2, getClientProps(CLIENT_NAME, "300"), (ccf) -> {
+ lsRule.startServerVM(2,
+ thisServer -> thisServer.withRegion(RegionShortcut.REPLICATE, BONDS_REGION)
+ .withConnectionToLocator(locatorPort));
+
+ client = lsRule.startClientVM(3, getClientProps(CLIENT_NAME, "300"), (ccf) -> {
ccf.setPoolSubscriptionEnabled(true);
ccf.addPoolLocator("localhost", locatorPort);
});
@@ -95,15 +100,18 @@ public class DurableClientCommandsDUnitTest {
@Test
- public void testListDurableClientCqs() {
+ public void testListDurableClientCqsForOneGroup() {
setupCqs();
CommandStringBuilder csb = new CommandStringBuilder(CliStrings.LIST_DURABLE_CQS);
csb.addOption(CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID, CLIENT_NAME);
+ csb.addOption(CliStrings.GROUP, CQ_GROUP);
String commandString = csb.toString();
- gfsh.executeAndAssertThat(commandString).statusIsSuccess().containsOutput(CQ1)
- .containsOutput(CQ2).containsOutput(CQ3);
+ gfsh.executeAndAssertThat(commandString).statusIsSuccess()
+ .hasTableSection()
+ .hasColumn("CQ Name")
+ .containsExactlyInAnyOrder("cq1", "cq2", "cq3");
closeCq(CQ1);
closeCq(CQ2);
@@ -112,9 +120,45 @@ public class DurableClientCommandsDUnitTest {
csb = new CommandStringBuilder(CliStrings.LIST_DURABLE_CQS);
csb.addOption(CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID, CLIENT_NAME);
commandString = csb.toString();
- String errorMessage =
- CliStrings.format(CliStrings.LIST_DURABLE_CQS__NO__CQS__FOR__CLIENT, CLIENT_NAME);
- gfsh.executeAndAssertThat(commandString).statusIsError().containsOutput(errorMessage);
+
+ gfsh.executeAndAssertThat(commandString).statusIsError()
+ .hasTableSection()
+ .hasColumn("CQ Name")
+ .containsExactlyInAnyOrder(
+ CliStrings.format(CliStrings.LIST_DURABLE_CQS__NO__CQS__FOR__CLIENT, CLIENT_NAME),
+ CliStrings.format(CliStrings.LIST_DURABLE_CQS__NO__CQS__REGISTERED));
+ }
+
+ @Test
+ public void testListDurableClientCqsWhenNoneExist() {
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.LIST_DURABLE_CQS);
+ csb.addOption(CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID, CLIENT_NAME);
+ String commandString = csb.toString();
+
+ gfsh.executeAndAssertThat(commandString).statusIsError()
+ .hasTableSection()
+ .hasColumn("CQ Name")
+ .containsExactlyInAnyOrder(
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, CLIENT_NAME),
+ CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, CLIENT_NAME));
+ }
+
+ @Test
+ public void testListDurableClientCqsWithMixedResults() {
+ setupCqs();
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.LIST_DURABLE_CQS);
+ csb.addOption(CliStrings.LIST_DURABLE_CQS__DURABLECLIENTID, CLIENT_NAME);
+ String commandString = csb.toString();
+
+ gfsh.executeAndAssertThat(commandString).statusIsError()
+ .hasTableSection()
+ .hasColumn("CQ Name")
+ .containsExactlyInAnyOrder("cq1", "cq2", "cq3", "No client found with client-id : dc1");
+
+ closeCq(CQ1);
+ closeCq(CQ2);
+ closeCq(CQ3);
}
@Test
@@ -124,10 +168,14 @@ public class DurableClientCommandsDUnitTest {
CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CLOSE_DURABLE_CLIENTS);
csb.addOption(CliStrings.CLOSE_DURABLE_CLIENTS__CLIENT__ID, CLIENT_NAME);
+ csb.addOption(CliStrings.GROUP, CQ_GROUP);
String commandString = csb.toString();
await().untilAsserted(() -> {
- gfsh.executeAndAssertThat(commandString).statusIsSuccess();
+ gfsh.executeAndAssertThat(commandString).statusIsSuccess()
+ .hasTableSection()
+ .hasColumn("Message")
+ .containsExactlyInAnyOrder("Closed the durable client : \"dc1\".");
});
String errorMessage = CliStrings.format(CliStrings.NO_CLIENT_FOUND_WITH_CLIENT_ID, CLIENT_NAME);
@@ -144,35 +192,42 @@ public class DurableClientCommandsDUnitTest {
csb.addOption(CliStrings.CLOSE_DURABLE_CQS__NAME, CQ1);
String commandString = csb.toString();
- gfsh.executeAndAssertThat(commandString).statusIsSuccess();
-
- csb = new CommandStringBuilder(CliStrings.CLOSE_DURABLE_CQS);
- csb.addOption(CliStrings.CLOSE_DURABLE_CQS__DURABLE__CLIENT__ID, CLIENT_NAME);
- csb.addOption(CliStrings.CLOSE_DURABLE_CQS__NAME, CQ1);
- commandString = csb.toString();
+ CommandResult result = gfsh.executeCommand(commandString);
+ assertThat(result.getStatus()).isEqualTo(Result.Status.OK);
+ assertThat(result.getTableColumnValues("Message")).containsExactlyInAnyOrder(
+ "Closed the durable cq : \"cq1\" for the durable client : \"dc1\".",
+ "No client found with client-id : dc1");
- gfsh.executeAndAssertThat(commandString).statusIsError();
+ result = gfsh.executeCommand(commandString);
+ assertThat(result.getStatus()).isEqualTo(Result.Status.ERROR);
}
@Test
public void testCountSubscriptionQueueSize() {
setupCqs();
- doPuts(REGION_NAME, Host.getHost(0).getVM(1));
+ doPuts(STOCKS_REGION, Host.getHost(0).getVM(1));
CommandStringBuilder csb = new CommandStringBuilder(CliStrings.COUNT_DURABLE_CQ_EVENTS);
csb.addOption(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CLIENT__ID, CLIENT_NAME);
+ csb.addOption(CliStrings.GROUP, CQ_GROUP);
String commandString = csb.toString();
- gfsh.executeAndAssertThat(commandString).statusIsSuccess().containsOutput("4");
-
+ gfsh.executeAndAssertThat(commandString).statusIsSuccess()
+ .hasTableSection()
+ .hasColumn("Queue Size")
+ .containsExactlyInAnyOrder("4");
csb = new CommandStringBuilder(CliStrings.COUNT_DURABLE_CQ_EVENTS);
csb.addOption(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CLIENT__ID, CLIENT_NAME);
csb.addOption(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CQ__NAME, CQ3);
+ csb.addOption(CliStrings.GROUP, CQ_GROUP);
commandString = csb.toString();
- gfsh.executeAndAssertThat(commandString).statusIsSuccess();
+ gfsh.executeAndAssertThat(commandString).statusIsSuccess()
+ .hasTableSection()
+ .hasColumn("Queue Size")
+ .containsExactlyInAnyOrder("0");
// CLOSE all the cqs
closeCq(CQ1);
@@ -235,15 +290,21 @@ public class DurableClientCommandsDUnitTest {
}
private void setupCqs() {
+ int locatorPort = locator.getPort();
client.invoke(() -> {
- QueryService qs = ClusterStartupRule.getClientCache().getQueryService();
+ PoolFactory poolFactory = PoolManager.createFactory().setServerGroup(CQ_GROUP);
+ poolFactory.addLocator("localhost", locatorPort);
+ poolFactory.setSubscriptionEnabled(true);
+ Pool pool = poolFactory.create("DEFAULT");
+
+ QueryService qs = pool.getQueryService();
CqAttributesFactory cqAf = new CqAttributesFactory();
try {
- qs.newCq(CQ1, "select * from /" + REGION_NAME, cqAf.create(), true).execute();
- qs.newCq(CQ2, "select * from /" + REGION_NAME + " where id = 1", cqAf.create(), true)
+ qs.newCq(CQ1, "select * from /" + STOCKS_REGION, cqAf.create(), true).execute();
+ qs.newCq(CQ2, "select * from /" + STOCKS_REGION + " where id = 1", cqAf.create(), true)
.execute();
- qs.newCq(CQ3, "select * from /" + REGION_NAME + " where id > 2", cqAf.create(), true)
+ qs.newCq(CQ3, "select * from /" + STOCKS_REGION + " where id > 2", cqAf.create(), true)
.execute();
} catch (CqException | CqExistsException | RegionNotFoundException e) {
throw new RuntimeException(e);