You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@geode.apache.org by "xiaojian zhou (JIRA)" <ji...@apache.org> on 2018/02/14 05:40:00 UTC
[jira] [Assigned] (GEODE-4659) AbstractGatewaySenderEventProcessor
put loop of filter in wrong place
[ https://issues.apache.org/jira/browse/GEODE-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xiaojian zhou reassigned GEODE-4659:
------------------------------------
Assignee: xiaojian zhou
> AbstractGatewaySenderEventProcessor put loop of filter in wrong place
> ---------------------------------------------------------------------
>
> Key: GEODE-4659
> URL: https://issues.apache.org/jira/browse/GEODE-4659
> Project: Geode
> Issue Type: Bug
> Components: wan
> Reporter: xiaojian zhou
> Assignee: xiaojian zhou
> Priority: Major
>
> {noformat}
> When fixing GEODE-3967, I found the loop of filter is in wrong place.
>
> If there's no filter defined, the processing to ignore UPDATE_VERSION_STAMP and events with CME should have nothing to do with filters. But if there's no filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events with CME.
>
> However, if fixed this problem. the GEODE-3967 have more race conditions to be fixed. (I have fixed several of them). It looks like this bug hided other race conditions from blowing out.
>
> GIving the time constrain, I will not fix the filter issue in GEODE_3967 and log this bug for future reference.
>
> Here are the diff to fix or this bug:
> diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> index 8739a8f72..a3a89fbd0 100644
> --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
> @@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySe
> * @param disp
> * @return true if remote site Gemfire Version is >= 7.0.1
> */
> - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
> - throws GatewaySenderException {
> - try {
> - GatewaySenderEventRemoteDispatcher remoteDispatcher =
> - (GatewaySenderEventRemoteDispatcher) disp;
> - // This will create a new connection if no batch has been sent till
> - // now.
> - Connection conn = remoteDispatcher.getConnection(false);
> - if (conn != null) {
> - short remoteSiteVersion = conn.getWanSiteVersion();
> - if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
> - return true;
> - }
> - }
> - } catch (GatewaySenderException e) {
> - Throwable cause = e.getCause();
> - if (cause instanceof IOException || e instanceof GatewaySenderConfigurationException
> - || cause instanceof ConnectionDestroyedException) {
> - try {
> - int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
> - if (logger.isDebugEnabled()) {
> - logger.debug("Sleeping for {} milliseconds", sleepInterval);
> - }
> - Thread.sleep(sleepInterval);
> - } catch (InterruptedException ie) {
> - // log the exception
> - if (logger.isDebugEnabled()) {
> - logger.debug(ie.getMessage(), ie);
> - }
> - }
> - }
> - throw e;
> - }
> - return false;
> + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
> + return true;
> }
> }
> diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> index 69005e02b..da5d1baee 100644
> --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
> @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
> import org.apache.geode.cache.wan.GatewaySender;
> import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
> import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
> +import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
> import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
> import org.apache.geode.internal.logging.LogService;
> @@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender
> }
> }
> + /**
> + * Returns if corresponding receiver WAN site of this GatewaySender has GemfireVersion > 7.0.1
> + *
> + * @param disp
> + * @return true if remote site Gemfire Version is >= 7.0.1
> + */
> + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
> + return true;
> + }
> +
> }
> diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> index 7e67e9bfb..439394382 100644
> --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
> @@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
> }
> // Filter the events
> - for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
> - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> - while (itr.hasNext()) {
> - GatewayQueueEvent event = itr.next();
> -
> - // This seems right place to prevent transmission of UPDATE_VERSION events if
> - // receiver's
> - // version is < 7.0.1, especially to prevent another loop over events.
> - if (!sendUpdateVersionEvents
> - && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
> - if (isTraceEnabled) {
> - logger.trace(
> - "Update Event Version event: {} removed from Gateway Sender queue: {}", event,
> - sender);
> - }
> + Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> + while (itr.hasNext()) {
> + GatewayQueueEvent event = itr.next();
> +
> + // This seems right place to prevent transmission of UPDATE_VERSION events if
> + // receiver's
> + // version is < 7.0.1, especially to prevent another loop over events.
> + if (!sendUpdateVersionEvents
> + && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
> + if (isDebugEnabled) {
> + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}",
> + event, sender);
> + }
> - itr.remove();
> - statistics.incEventsNotQueued();
> - continue;
> + itr.remove();
> + statistics.incEventsNotQueued();
> + continue;
> + }
> +
> + if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
> + if (isDebugEnabled) {
> + logger.debug(
> + "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}",
> + event, sender);
> }
> + itr.remove();
> + statistics.incEventsNotQueued();
> + continue;
> + }
> +
> + for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
> boolean transmit = filter.beforeTransmit(event);
> if (!transmit) {
> if (isDebugEnabled) {
> @@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
> }
> itr.remove();
> statistics.incEventsFiltered();
> + break;
> }
> }
> }
> @@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
> // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
> if (this.getSender().isParallel()
> && (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
> - Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
> - while (itr.hasNext()) {
> - GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next();
> + Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator();
> + while (eventItr.hasNext()) {
> + GatewaySenderEventImpl event = (GatewaySenderEventImpl) eventItr.next();
> PartitionedRegion qpr = null;
> if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) {
> qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue())
> @@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
> } // for
> }
> - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
> + protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
> // onyly in case of remote dispatcher we send versioned events
> return false;
> }{noformat}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)