You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:35 UTC
[geode] 03/18: GEODE-6588: Cleanup GatewayReceiverCommand
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit b92a0f6bc4d606193407c3c3ad9b085e28ce3685
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu May 20 14:53:09 2021 -0700
GEODE-6588: Cleanup GatewayReceiverCommand
---
.../sockets/command/GatewayReceiverCommand.java | 95 ++++++----------
.../internal/cache/wan/GatewayReceiverStats.java | 124 ++++++++-------------
2 files changed, 85 insertions(+), 134 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 861e71e..1a15cd3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.internal.cache.tier.sockets.command;
+import static java.lang.String.format;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -70,7 +72,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (cache != null && cache.isCacheAtShutdownAll()) {
throw cache.getCacheClosedException("Shutdown occurred during message processing");
}
- String reason = String.format("Region %s was not found during batch create request %s",
+ String reason = format("Region %s was not found during batch create request %s",
regionName, batchId);
throw new RegionDestroyedException(reason, regionName);
}
@@ -171,7 +173,7 @@ public class GatewayReceiverCommand extends BaseCommand {
try {
possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject();
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -179,14 +181,9 @@ public class GatewayReceiverCommand extends BaseCommand {
}
boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01;
- // Make sure instance variables are null before each iteration
- String regionName = null;
- Object key = null;
- Object callbackArg = null;
-
// Retrieve the region name from the message parts
Part regionNamePart = clientMessage.getPart(partNumber + 2);
- regionName = regionNamePart.getCachedString();
+ String regionName = regionNamePart.getCachedString();
if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
indexWithoutPDXEvent--;
isPdxEvent = true;
@@ -204,7 +201,7 @@ public class GatewayReceiverCommand extends BaseCommand {
try {
eventId = (EventID) eventIdPart.getObject();
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -213,10 +210,11 @@ public class GatewayReceiverCommand extends BaseCommand {
// Retrieve the key from the message parts
Part keyPart = clientMessage.getPart(partNumber + 4);
+ Object key;
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -228,24 +226,12 @@ public class GatewayReceiverCommand extends BaseCommand {
long versionTimeStamp;
Part callbackArgExistsPart;
LocalRegion region;
+ Object callbackArg = null;
switch (actionType) {
case 0: // Create
try {
-
- /*
- * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key;
- * System.out.println("Processing new key: " + key); if
- * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings
- * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER
- * )); }
- */
-
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
- // try {
- // logger.warn(getName() + ": Creating key " + key + " value " +
- // valuePart.getObject());
- // } catch (Exception e) {}
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 6;
@@ -260,7 +246,7 @@ public class GatewayReceiverCommand extends BaseCommand {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
logger
- .warn(String.format(
+ .warn(format(
"%s: Caught exception processing batch create request %s for %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
@@ -283,7 +269,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (regionName == null) {
message = "%s: The input region name for the batch create request %s is null";
}
- String s = String.format(message, serverConnection.getName(), batchId);
+ String s = format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
@@ -303,8 +289,7 @@ public class GatewayReceiverCommand extends BaseCommand {
handleMessageRetry(region, clientEvent);
byte[] value = valuePart.getSerializedForm();
boolean isObject = valuePart.isObject();
- // [sumedh] This should be done on client while sending
- // since that is the WAN gateway
+ // This should be done on client while sending since that is the WAN gateway
AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
PutOperationContext putContext =
@@ -313,7 +298,7 @@ public class GatewayReceiverCommand extends BaseCommand {
isObject = putContext.isObject();
}
// Attempt to create the entry
- boolean result = false;
+ boolean result;
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
@@ -334,13 +319,13 @@ public class GatewayReceiverCommand extends BaseCommand {
} else {
// This exception will be logged in the catch block below
throw new Exception(
- String.format(
+ format(
"%s: Failed to create or update entry for region %s key %s value %s callbackArg %s",
serverConnection.getName(), regionName, key, valuePart, callbackArg));
}
}
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch create request %s for %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -351,10 +336,6 @@ public class GatewayReceiverCommand extends BaseCommand {
try {
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
- // try {
- // logger.warn(getName() + ": Updating key " + key + " value " +
- // valuePart.getObject());
- // } catch (Exception e) {}
// Retrieve the callbackArg from the message parts if necessary
index = partNumber + 6;
@@ -370,7 +351,7 @@ public class GatewayReceiverCommand extends BaseCommand {
} catch (Exception e) {
logger
.warn(
- String.format(
+ format(
"%s: Caught exception processing batch update request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
@@ -393,7 +374,7 @@ public class GatewayReceiverCommand extends BaseCommand {
if (regionName == null) {
message = "%s: The input region name for the batch update request %s is null";
}
- String s = String.format(message, serverConnection.getName(), batchId);
+ String s = format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
@@ -420,7 +401,7 @@ public class GatewayReceiverCommand extends BaseCommand {
value = putContext.getSerializedValue();
isObject = putContext.isObject();
}
- boolean result = false;
+ final boolean result;
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
@@ -434,7 +415,7 @@ public class GatewayReceiverCommand extends BaseCommand {
} else {
final String message =
"%s: Failed to update entry for region %s, key %s, value %s, and callbackArg %s";
- String s = String.format(message, serverConnection.getName(), regionName,
+ String s = format(message, serverConnection.getName(), regionName,
key, valuePart, callbackArg);
logger.info(s);
throw new Exception(s);
@@ -442,7 +423,7 @@ public class GatewayReceiverCommand extends BaseCommand {
}
} catch (Exception e) {
// Preserve the connection under all circumstances
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch update request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -465,7 +446,7 @@ public class GatewayReceiverCommand extends BaseCommand {
} catch (Exception e) {
logger
.warn(
- String.format(
+ format(
"%s: Caught exception processing batch destroy request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
@@ -491,7 +472,7 @@ public class GatewayReceiverCommand extends BaseCommand {
message =
"%s: The input region name for the batch destroy request %s is null";
}
- String s = String.format(message, serverConnection.getName(), batchId);
+ String s = format(message, serverConnection.getName(), batchId);
logger.warn(s);
throw new Exception(s);
}
@@ -527,7 +508,7 @@ public class GatewayReceiverCommand extends BaseCommand {
retry = false;
}
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch destroy request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents),
e);
@@ -573,7 +554,7 @@ public class GatewayReceiverCommand extends BaseCommand {
String message =
"%s: Caught exception processing batch update version request request %s containing %s events";
- String s = String.format(message, serverConnection.getName(),
+ String s = format(message, serverConnection.getName(),
batchId, numberOfEvents);
logger.warn(s);
throw new Exception(s);
@@ -608,7 +589,7 @@ public class GatewayReceiverCommand extends BaseCommand {
}
}
} catch (Exception e) {
- logger.warn(String.format(
+ logger.warn(format(
"%s: Caught exception processing batch update version request request %s containing %s events",
serverConnection.getName(), batchId, numberOfEvents), e);
handleException(removeOnException, stats, e);
@@ -638,7 +619,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// If we have an issue with the PDX registry, stop processing more data
if (e.getCause() instanceof PdxRegistryMismatchException) {
fatalException = e.getCause();
- logger.fatal(String.format(
+ logger.fatal(format(
"This gateway receiver has received a PDX type from %s that does match the existing PDX type. This gateway receiver will not process any more events, in order to prevent receiving objects which may not be deserializable.",
serverConnection.getMembershipID()), e.getCause());
break;
@@ -647,7 +628,7 @@ public class GatewayReceiverCommand extends BaseCommand {
// Increment the batch id unless the received batch id is -1 (a
// failover batch)
DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem();
- String exceptionMessage = String.format(
+ String exceptionMessage = format(
"Exception occurred while processing a batch on the receiver running on DistributedSystem with Id: %s, DistributedMember on which the receiver is running: %s",
((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(),
ds.getDistributedMember());
@@ -685,11 +666,11 @@ public class GatewayReceiverCommand extends BaseCommand {
}
if (fatalException != null) {
serverConnection.incrementLatestBatchIdReplied(batchId);
- writeFatalException(clientMessage, fatalException, serverConnection, batchId);
+ writeFatalException(clientMessage, fatalException, serverConnection);
serverConnection.setAsTrue(RESPONDED);
} else if (!exceptions.isEmpty()) {
serverConnection.incrementLatestBatchIdReplied(batchId);
- writeBatchException(clientMessage, exceptions, serverConnection, batchId);
+ writeBatchException(clientMessage, exceptions, serverConnection);
serverConnection.setAsTrue(RESPONDED);
} else {
// Increment the batch id unless the received batch id is -1 (a failover
@@ -725,7 +706,7 @@ public class GatewayReceiverCommand extends BaseCommand {
private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e)
throws Exception {
- if (shouldThrowException(removeOnException, e)) {
+ if (shouldThrowException(removeOnException)) {
throw e;
} else {
stats.incEventsRetried();
@@ -733,9 +714,9 @@ public class GatewayReceiverCommand extends BaseCommand {
}
}
- private boolean shouldThrowException(boolean removeOnException, Exception e) {
+ private boolean shouldThrowException(boolean removeOnException) {
// Split out in case specific exceptions would short-circuit retry logic.
- // Currently it just considers the boolean.
+ // Currently, it just considers the boolean.
return removeOnException;
}
@@ -770,18 +751,15 @@ public class GatewayReceiverCommand extends BaseCommand {
}
private static void writeBatchException(Message origMsg, List<BatchException70> exceptions,
- ServerConnection servConn, int batchId) throws IOException {
+ ServerConnection servConn) throws IOException {
Message errorMsg = servConn.getErrorResponseMessage();
errorMsg.setMessageType(MessageType.EXCEPTION);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
-
errorMsg.addObjPart(exceptions);
- // errorMsg.addStringPart(be.toString());
errorMsg.send(servConn);
- for (Exception e : exceptions) {
- ((GatewayReceiverStats) servConn.getCacheServerStats()).incExceptionsOccurred();
- }
+ ((GatewayReceiverStats) servConn.getCacheServerStats())
+ .incExceptionsOccurred(exceptions.size());
for (Exception be : exceptions) {
if (logger.isWarnEnabled()) {
logger.warn(servConn.getName() + ": Wrote batch exception: ",
@@ -791,13 +769,12 @@ public class GatewayReceiverCommand extends BaseCommand {
}
private static void writeFatalException(Message origMsg, Throwable exception,
- ServerConnection servConn, int batchId) throws IOException {
+ ServerConnection servConn) throws IOException {
Message errorMsg = servConn.getErrorResponseMessage();
errorMsg.setMessageType(MessageType.EXCEPTION);
errorMsg.setNumberOfParts(2);
errorMsg.setTransactionId(origMsg.getTransactionId());
errorMsg.addObjPart(exception);
- // errorMsg.addStringPart(be.toString());
errorMsg.send(servConn);
logger.warn(servConn.getName() + ": Wrote batch exception: ",
exception);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
index 4184b16..73af7b0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java
@@ -27,11 +27,6 @@ public class GatewayReceiverStats extends CacheServerStats {
private static final String typeName = "GatewayReceiverStatistics";
- // ////////////////// Statistic "Id" Fields ////////////////////
-
- // /** Name of the events queued statistic */
- // private static final String FAILOVER_BATCHES_RECEIVED = "failoverBatchesReceived";
-
/**
* Name of the events not queued because conflated statistic
*/
@@ -89,22 +84,18 @@ public class GatewayReceiverStats extends CacheServerStats {
/**
* Id of the events not queued because conflated statistic
*/
- private int duplicateBatchesReceivedId;
+ private final int duplicateBatchesReceivedId;
/**
* Id of the event queue time statistic
*/
- private int outoforderBatchesReceivedId;
+ private final int outoforderBatchesReceivedId;
/**
* Id of the event queue size statistic
*/
- private int earlyAcksId;
+ private final int earlyAcksId;
- /**
- * Id of the events distributed statistic
- */
- private int eventsReceivedId;
private final Counter eventsReceivedCounter;
private static final String EVENTS_RECEIVED_COUNTER_NAME =
"geode.gateway.receiver.events";
@@ -115,59 +106,59 @@ public class GatewayReceiverStats extends CacheServerStats {
/**
* Id of the events exceeding alert threshold statistic
*/
- private int createRequestId;
+ private final int createRequestId;
/**
* Id of the batch distribution time statistic
*/
- private int updateRequestId;
+ private final int updateRequestId;
/**
* Id of the batches distributed statistic
*/
- private int destroyRequestId;
+ private final int destroyRequestId;
/**
* Id of the batches redistributed statistic
*/
- private int unknowsOperationsReceivedId;
+ private final int unknowsOperationsReceivedId;
/**
* Id of the unprocessed events added by primary statistic
*/
- private int exceptionsOccurredId;
+ private final int exceptionsOccurredId;
/**
* Id of the events retried statistic
*/
- private int eventsRetriedId;
+ private final int eventsRetriedId;
// ///////////////////// Constructors ///////////////////////
public static GatewayReceiverStats createGatewayReceiverStats(StatisticsFactory f,
String ownerName, MeterRegistry meterRegistry) {
StatisticDescriptor[] descriptors = new StatisticDescriptor[] {
- f.createIntCounter(DUPLICATE_BATCHES_RECEIVED,
+ f.createLongCounter(DUPLICATE_BATCHES_RECEIVED,
"number of batches which have already been seen by this GatewayReceiver",
"nanoseconds"),
- f.createIntCounter(OUT_OF_ORDER_BATCHES_RECEIVED,
+ f.createLongCounter(OUT_OF_ORDER_BATCHES_RECEIVED,
"number of batches which are out of order on this GatewayReceiver", "operations"),
- f.createIntCounter(EARLY_ACKS, "number of early acknowledgements sent to gatewaySenders",
+ f.createLongCounter(EARLY_ACKS, "number of early acknowledgements sent to gatewaySenders",
"operations"),
f.createLongCounter(EVENTS_RECEIVED,
EVENTS_RECEIVED_COUNTER_DESCRIPTION,
EVENTS_RECEIVED_COUNTER_UNITS),
- f.createIntCounter(CREAT_REQUESTS,
+ f.createLongCounter(CREAT_REQUESTS,
"total number of create operations received by this GatewayReceiver", "operations"),
- f.createIntCounter(UPDATE_REQUESTS,
+ f.createLongCounter(UPDATE_REQUESTS,
"total number of update operations received by this GatewayReceiver", "operations"),
- f.createIntCounter(DESTROY_REQUESTS,
+ f.createLongCounter(DESTROY_REQUESTS,
"total number of destroy operations received by this GatewayReceiver", "operations"),
- f.createIntCounter(UNKNOWN_OPERATIONS_RECEIVED,
+ f.createLongCounter(UNKNOWN_OPERATIONS_RECEIVED,
"total number of unknown operations received by this GatewayReceiver", "operations"),
- f.createIntCounter(EXCEPTIONS_OCCURRED,
+ f.createLongCounter(EXCEPTIONS_OCCURRED,
"number of exceptions occurred while porcessing the batches", "operations"),
- f.createIntCounter(EVENTS_RETRIED,
+ f.createLongCounter(EVENTS_RETRIED,
"total number events retried by this GatewayReceiver due to exceptions", "operations")};
return new GatewayReceiverStats(f, ownerName, typeName, descriptors, meterRegistry);
@@ -177,11 +168,10 @@ public class GatewayReceiverStats extends CacheServerStats {
StatisticDescriptor[] descriptiors, MeterRegistry meterRegistry) {
super(f, ownerName, typeName, descriptiors);
// Initialize id fields
- // failoverBatchesReceivedId = statType.nameToId(FAILOVER_BATCHES_RECEIVED);
duplicateBatchesReceivedId = statType.nameToId(DUPLICATE_BATCHES_RECEIVED);
outoforderBatchesReceivedId = statType.nameToId(OUT_OF_ORDER_BATCHES_RECEIVED);
earlyAcksId = statType.nameToId(EARLY_ACKS);
- eventsReceivedId = statType.nameToId(EVENTS_RECEIVED);
+ final int eventsReceivedId = statType.nameToId(EVENTS_RECEIVED);
createRequestId = statType.nameToId(CREAT_REQUESTS);
updateRequestId = statType.nameToId(UPDATE_REQUESTS);
destroyRequestId = statType.nameToId(DESTROY_REQUESTS);
@@ -197,50 +187,37 @@ public class GatewayReceiverStats extends CacheServerStats {
.register(meterRegistry);
}
- // /////////////////// Instance Methods /////////////////////
-
- // /**
- // * Increments the number of failover batches received by 1.
- // */
- // public void incFailoverBatchesReceived() {
- // this.stats.incInt(failoverBatchesReceivedId, 1);
- // }
- //
- // public int getFailoverBatchesReceived() {
- // return this.stats.getInt(failoverBatchesReceivedId);
- // }
-
/**
* Increments the number of duplicate batches received by 1.
*/
public void incDuplicateBatchesReceived() {
- this.stats.incInt(duplicateBatchesReceivedId, 1);
+ stats.incLong(duplicateBatchesReceivedId, 1);
}
- public int getDuplicateBatchesReceived() {
- return this.stats.getInt(duplicateBatchesReceivedId);
+ public long getDuplicateBatchesReceived() {
+ return stats.getLong(duplicateBatchesReceivedId);
}
/**
* Increments the number of out of order batches received by 1.
*/
public void incOutoforderBatchesReceived() {
- this.stats.incInt(outoforderBatchesReceivedId, 1);
+ stats.incLong(outoforderBatchesReceivedId, 1);
}
- public int getOutoforderBatchesReceived() {
- return this.stats.getInt(outoforderBatchesReceivedId);
+ public long getOutoforderBatchesReceived() {
+ return stats.getLong(outoforderBatchesReceivedId);
}
/**
* Increments the number of early acks by 1.
*/
public void incEarlyAcks() {
- this.stats.incInt(earlyAcksId, 1);
+ stats.incLong(earlyAcksId, 1);
}
- public int getEarlyAcks() {
- return this.stats.getInt(earlyAcksId);
+ public long getEarlyAcks() {
+ return stats.getLong(earlyAcksId);
}
/**
@@ -250,74 +227,71 @@ public class GatewayReceiverStats extends CacheServerStats {
eventsReceivedCounter.increment(delta);
}
- public int getEventsReceived() {
- return (int) eventsReceivedCounter.count();
+ public long getEventsReceived() {
+ return (long) eventsReceivedCounter.count();
}
/**
* Increments the number of create requests by 1.
*/
public void incCreateRequest() {
- this.stats.incInt(createRequestId, 1);
+ stats.incLong(createRequestId, 1);
}
- public int getCreateRequest() {
- return this.stats.getInt(createRequestId);
+ public long getCreateRequest() {
+ return stats.getLong(createRequestId);
}
/**
* Increments the number of update requests by 1.
*/
public void incUpdateRequest() {
- this.stats.incInt(updateRequestId, 1);
+ stats.incLong(updateRequestId, 1);
}
- public int getUpdateRequest() {
- return this.stats.getInt(updateRequestId);
+ public long getUpdateRequest() {
+ return stats.getLong(updateRequestId);
}
/**
* Increments the number of destroy request received by 1.
*/
public void incDestroyRequest() {
- this.stats.incInt(destroyRequestId, 1);
+ stats.incLong(destroyRequestId, 1);
}
- public int getDestroyRequest() {
- return this.stats.getInt(destroyRequestId);
+ public long getDestroyRequest() {
+ return stats.getLong(destroyRequestId);
}
/**
* Increments the number of unknown operations received by 1.
*/
public void incUnknowsOperationsReceived() {
- this.stats.incInt(unknowsOperationsReceivedId, 1);
+ stats.incLong(unknowsOperationsReceivedId, 1);
}
- public int getUnknowsOperationsReceived() {
- return this.stats.getInt(unknowsOperationsReceivedId);
+ public long getUnknowsOperationsReceived() {
+ return stats.getLong(unknowsOperationsReceivedId);
}
- /**
- * Increments the number of exceptions occurred by 1.
- */
- public void incExceptionsOccurred() {
- this.stats.incInt(exceptionsOccurredId, 1);
+ public void incExceptionsOccurred(int delta) {
+ stats.incLong(exceptionsOccurredId, delta);
}
- public int getExceptionsOccurred() {
- return this.stats.getInt(exceptionsOccurredId);
+ public long getExceptionsOccurred() {
+ return stats.getLong(exceptionsOccurredId);
}
/**
* Increments the number of events received by 1.
*/
public void incEventsRetried() {
- this.stats.incInt(eventsRetriedId, 1);
+ stats.incLong(eventsRetriedId, 1);
}
- public int getEventsRetried() {
- return this.stats.getInt(eventsRetriedId);
+ public long getEventsRetried() {
+ return stats.getLong(eventsRetriedId);
}
/**