You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@geode.apache.org by "xiaojian zhou (JIRA)" <ji...@apache.org> on 2018/02/13 18:01:00 UTC
[jira] [Created] (GEODE-4659) AbstractGatewaySenderEventProcessor
put loop of filter in wrong place
xiaojian zhou created GEODE-4659:
------------------------------------
Summary: AbstractGatewaySenderEventProcessor put loop of filter in wrong place
Key: GEODE-4659
URL: https://issues.apache.org/jira/browse/GEODE-4659
Project: Geode
Issue Type: New Feature
Components: wan
Reporter: xiaojian zhou
{noformat}
When fixing GEODE-3967, I found the loop of filter is in wrong place.
If there's no filter defined, the processing to ignore UPDATE_VERSION_STAMP and events with CME should have nothing to do with filters. But if there's no filter defined, the code will not ignore the UPDATE_VERSION_STAMP and events with CME.
However, if fixed this problem. the GEODE-3967 have more race conditions to be fixed. (I have fixed several of them). It looks like this bug hided other race conditions from blowing out.
GIving the time constrain, I will not fix the filter issue in GEODE_3967 and log this bug for future reference.
Here are the diff to fix or this bug:
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
index 8739a8f72..a3a89fbd0 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java
@@ -81,40 +81,8 @@ public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySe
* @param disp
* @return true if remote site Gemfire Version is >= 7.0.1
*/
- private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp)
- throws GatewaySenderException {
- try {
- GatewaySenderEventRemoteDispatcher remoteDispatcher =
- (GatewaySenderEventRemoteDispatcher) disp;
- // This will create a new connection if no batch has been sent till
- // now.
- Connection conn = remoteDispatcher.getConnection(false);
- if (conn != null) {
- short remoteSiteVersion = conn.getWanSiteVersion();
- if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) {
- return true;
- }
- }
- } catch (GatewaySenderException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException || e instanceof GatewaySenderConfigurationException
- || cause instanceof ConnectionDestroyedException) {
- try {
- int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL;
- if (logger.isDebugEnabled()) {
- logger.debug("Sleeping for {} milliseconds", sleepInterval);
- }
- Thread.sleep(sleepInterval);
- } catch (InterruptedException ie) {
- // log the exception
- if (logger.isDebugEnabled()) {
- logger.debug(ie.getMessage(), ie);
- }
- }
- }
- throw e;
- }
- return false;
+ protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
+ return true;
}
}
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
index 69005e02b..da5d1baee 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
import org.apache.geode.internal.logging.LogService;
@@ -44,4 +45,14 @@ public class RemoteSerialGatewaySenderEventProcessor extends SerialGatewaySender
}
}
+ /**
+ * Returns if corresponding receiver WAN site of this GatewaySender has GemfireVersion > 7.0.1
+ *
+ * @param disp
+ * @return true if remote site Gemfire Version is >= 7.0.1
+ */
+ protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) {
+ return true;
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7e67e9bfb..439394382 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -509,27 +509,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
// Filter the events
- for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
- Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
- while (itr.hasNext()) {
- GatewayQueueEvent event = itr.next();
-
- // This seems right place to prevent transmission of UPDATE_VERSION events if
- // receiver's
- // version is < 7.0.1, especially to prevent another loop over events.
- if (!sendUpdateVersionEvents
- && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
- if (isTraceEnabled) {
- logger.trace(
- "Update Event Version event: {} removed from Gateway Sender queue: {}", event,
- sender);
- }
+ Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
+ while (itr.hasNext()) {
+ GatewayQueueEvent event = itr.next();
+
+ // This seems right place to prevent transmission of UPDATE_VERSION events if
+ // receiver's
+ // version is < 7.0.1, especially to prevent another loop over events.
+ if (!sendUpdateVersionEvents
+ && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
+ if (isDebugEnabled) {
+ logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}",
+ event, sender);
+ }
- itr.remove();
- statistics.incEventsNotQueued();
- continue;
+ itr.remove();
+ statistics.incEventsNotQueued();
+ continue;
+ }
+
+ if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}",
+ event, sender);
}
+ itr.remove();
+ statistics.incEventsNotQueued();
+ continue;
+ }
+
+ for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
boolean transmit = filter.beforeTransmit(event);
if (!transmit) {
if (isDebugEnabled) {
@@ -538,6 +549,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
itr.remove();
statistics.incEventsFiltered();
+ break;
}
}
}
@@ -550,9 +562,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
// AsyncEventQueue since possibleDuplicate flag is not used in WAN.
if (this.getSender().isParallel()
&& (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
- Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
- while (itr.hasNext()) {
- GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next();
+ Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator();
+ while (eventItr.hasNext()) {
+ GatewaySenderEventImpl event = (GatewaySenderEventImpl) eventItr.next();
PartitionedRegion qpr = null;
if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) {
qpr = ((ConcurrentParallelGatewaySenderQueue) this.getQueue())
@@ -726,7 +738,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
} // for
}
- private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
+ protected boolean shouldSendVersionEvents(GatewaySenderEventDispatcher dispatcher) {
// onyly in case of remote dispatcher we send versioned events
return false;
}{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)