You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/11 21:06:54 UTC
[15/52] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
index 9c01e05..9b8302e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
@@ -12,18 +12,14 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
-import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
@@ -31,7 +27,6 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class AddPdxType extends BaseCommand {
private static final Logger logger = LogService.getLogger();
@@ -61,7 +56,7 @@ public class AddPdxType extends BaseCommand {
// client side.
type.setTypeId(typeId);
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteType(typeId, type);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
index 84e5bd0..e63ac22 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java
@@ -26,9 +26,8 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -155,7 +154,7 @@ public class ExecuteFunction extends BaseCommand {
logger.debug("Executing Function on Server: " + servConn.toString() + "with context :"
+ context.toString());
}
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
HeapMemoryMonitor hmm =
((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
index 102e8e8..8fafd10 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java
@@ -26,9 +26,8 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -183,7 +182,7 @@ public class ExecuteFunction65 extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
}
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
HeapMemoryMonitor hmm =
((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
index f67dd81..d007777 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.client.internal.ConnectionImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
@@ -32,10 +33,10 @@ import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
@@ -96,7 +97,7 @@ public class ExecuteFunction66 extends BaseCommand {
boolean isReexecute = false;
boolean allMembers = false;
boolean ignoreFailedMembers = false;
- int functionTimeout = GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
+ int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
try {
byte[] bytes = msg.getPart(0).getSerializedForm();
functionState = bytes[0];
@@ -220,7 +221,7 @@ public class ExecuteFunction66 extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("Executing Function on Server: {} with context: {}", servConn, context);
}
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
HeapMemoryMonitor hmm =
((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical()
@@ -329,14 +330,14 @@ public class ExecuteFunction66 extends BaseCommand {
.toString(fn.getId()));
}
} else {
- /**
+ /*
* if dm is null it mean cache is also null. Transactional function without cache cannot be
* executed.
*/
final TXStateProxy txState = TXManagerImpl.getCurrentTXState();
Runnable functionExecution = new Runnable() {
public void run() {
- GemFireCacheImpl cache = null;
+ InternalCache cache = null;
try {
if (txState != null) {
cache = GemFireCacheImpl.getExisting("executing function");
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
index ade0aca..0ed7235 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunction66.java
@@ -19,6 +19,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.internal.ConnectionImpl;
import org.apache.geode.cache.client.internal.ExecuteFunctionHelper;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
@@ -28,7 +29,6 @@ import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor;
@@ -78,7 +78,7 @@ public class ExecuteRegionFunction66 extends BaseCommand {
int filterSize = 0, partNumber = 0;
CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
byte functionState = 0;
- int functionTimeout = GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
+ int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
try {
byte[] bytes = msg.getPart(0).getSerializedForm();
functionState = bytes[0];
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
index 792b1ff..8b2cf75 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteRegionFunctionSingleHop.java
@@ -20,13 +20,13 @@ import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.internal.ConnectionImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.operations.ExecuteFunctionOperationContext;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -77,7 +77,7 @@ public class ExecuteRegionFunctionSingleHop extends BaseCommand {
Set<Object> removedNodesSet = null;
int filterSize = 0, bucketIdsSize = 0, partNumber = 0;
CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- int functionTimeout = GemFireCacheImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
+ int functionTimeout = ConnectionImpl.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
try {
byte[] bytes = msg.getPart(0).getSerializedForm();
functionState = bytes[0];
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 8885a99..d44a4ad 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -12,9 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
@@ -31,12 +28,13 @@ import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.i18n.LogWriterI18n;
+import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.EventIDHolder;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
@@ -58,7 +56,6 @@ import org.apache.geode.pdx.internal.EnumId;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.geode.i18n.StringId;
public class GatewayReceiverCommand extends BaseCommand {
@@ -71,8 +68,8 @@ public class GatewayReceiverCommand extends BaseCommand {
private GatewayReceiverCommand() {}
private void handleRegionNull(ServerConnection servConn, String regionName, int batchId) {
- GemFireCacheImpl gfc = (GemFireCacheImpl) servConn.getCachedRegionHelper().getCache();
- if (gfc != null && gfc.isCacheAtShutdownAll()) {
+ InternalCache cache = servConn.getCachedRegionHelper().getCache();
+ if (cache != null && cache.isCacheAtShutdownAll()) {
throw new CacheClosedException("Shutdown occurred during message processing");
} else {
String reason = LocalizedStrings.ProcessBatch_WAS_NOT_FOUND_DURING_BATCH_CREATE_REQUEST_0
@@ -808,12 +805,10 @@ public class GatewayReceiverCommand extends BaseCommand {
if (key instanceof EnumId) {
EnumId enumId = (EnumId) key;
value = BlobHelper.deserializeBlob((byte[]) value);
- ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteEnum(enumId.intValue(),
- (EnumInfo) value);
+ crHelper.getCache().getPdxRegistry().addRemoteEnum(enumId.intValue(), (EnumInfo) value);
} else {
value = BlobHelper.deserializeBlob((byte[]) value);
- ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteType((int) key,
- (PdxType) value);
+ crHelper.getCache().getPdxRegistry().addRemoteType((int) key, (PdxType) value);
}
return true;
}
@@ -867,7 +862,6 @@ public class GatewayReceiverCommand extends BaseCommand {
servConn.getName()), be);
}
}
-
}
private static void writeFatalException(Message origMsg, Throwable exception,
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
index 72e375c..54a21ed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java
@@ -12,14 +12,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class GetPDXEnumById extends BaseCommand {
private final static GetPDXEnumById singleton = new GetPDXEnumById();
@@ -51,7 +47,7 @@ public class GetPDXEnumById extends BaseCommand {
EnumInfo result;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
result = registry.getEnumInfoById(enumId);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
index 25bfe3d..1b21383 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java
@@ -12,14 +12,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class GetPDXIdForEnum extends BaseCommand {
private final static GetPDXIdForEnum singleton = new GetPDXIdForEnum();
@@ -52,7 +48,7 @@ public class GetPDXIdForEnum extends BaseCommand {
int enumId;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
enumId = registry.defineEnum(enumInfo);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
index 3c80c76..2054196 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java
@@ -12,14 +12,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class GetPDXIdForType extends BaseCommand {
private final static GetPDXIdForType singleton = new GetPDXIdForType();
@@ -53,7 +49,7 @@ public class GetPDXIdForType extends BaseCommand {
int pdxId;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
pdxId = registry.defineType(type);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
index 603d3d0..2470893 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java
@@ -12,14 +12,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class GetPDXTypeById extends BaseCommand {
private final static GetPDXTypeById singleton = new GetPDXTypeById();
@@ -51,7 +47,7 @@ public class GetPDXTypeById extends BaseCommand {
PdxType type;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
type = registry.getType(pdxId);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
index ca3d559..19551c4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java
@@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import java.util.Map;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -46,7 +46,7 @@ public class GetPdxEnums70 extends BaseCommand {
Map<Integer, EnumInfo> enums;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
enums = cache.getPdxRegistry().enumMap();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
index 8b73ed7..cc96b8e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java
@@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import java.util.Map;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -46,7 +46,7 @@ public class GetPdxTypes70 extends BaseCommand {
Map<Integer, PdxType> types;
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
types = cache.getPdxRegistry().typeMap();
} catch (Exception e) {
writeException(msg, e, false, servConn);
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
index 54235c1..d3c0393 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java
@@ -12,9 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
@@ -29,7 +26,6 @@ import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.types.CollectionType;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.sockets.BaseCommandQuery;
import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -65,7 +61,6 @@ public class Query extends BaseCommandQuery {
servConn.setRequestSpecificTimeout(timeout);
}
-
if (logger.isDebugEnabled()) {
logger.debug("{}: Received query request from {} queryString: {}", servConn.getName(),
servConn.getSocketString(), queryString);
@@ -73,7 +68,7 @@ public class Query extends BaseCommandQuery {
try {
// Create query
QueryService queryService =
- ((GemFireCacheImpl) servConn.getCachedRegionHelper().getCache()).getLocalQueryService();
+ servConn.getCachedRegionHelper().getCache().getLocalQueryService();
org.apache.geode.cache.query.Query query = queryService.newQuery(queryString);
Set regionNames = ((DefaultQuery) query).getRegionsInQuery(null);
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
index 4e30039..5849431 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java
@@ -12,26 +12,26 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.geode.cache.operations.QueryOperationContext;
+import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.types.CollectionType;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
-import org.apache.geode.internal.cache.tier.sockets.*;
+import org.apache.geode.internal.cache.tier.sockets.BaseCommandQuery;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.security.AuthorizeRequest;
-import org.apache.geode.cache.operations.QueryOperationContext;
-import org.apache.geode.cache.query.QueryService;
-import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.cache.query.QueryInvalidException;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
public class Query651 extends BaseCommandQuery {
@@ -92,7 +92,7 @@ public class Query651 extends BaseCommandQuery {
try {
// Create query
QueryService queryService =
- ((GemFireCacheImpl) servConn.getCachedRegionHelper().getCache()).getLocalQueryService();
+ servConn.getCachedRegionHelper().getCache().getLocalQueryService();
org.apache.geode.cache.query.Query query = null;
if (queryParams != null) {
@@ -138,6 +138,4 @@ public class Query651 extends BaseCommandQuery {
protected CollectionType getCollectionType(SelectResults selectResults) {
return new CollectionTypeImpl(List.class, selectResults.getCollectionType().getElementType());
}
-
-
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
index a085353..3fd84d6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java
@@ -16,10 +16,8 @@ package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import org.apache.geode.LogWriter;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.ha.HAContainerWrapper;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index e324a7f..72eab50 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -14,21 +14,26 @@
*/
package org.apache.geode.internal.cache.tier.sockets.command;
+import java.io.IOException;
+
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.WaitForViewInstallation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.FindRemoteTXMessage;
import org.apache.geode.internal.cache.FindRemoteTXMessage.FindRemoteTXMessageReplyProcessor;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PeerTXStateStub;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
-import java.io.IOException;
-
/**
* Used for bootstrapping txState/PeerTXStateStub on the server. This command is send when in client
* in a transaction is about to failover to this server
@@ -91,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand {
// bug #42228 and bug #43504 - this cannot return until the current view
// has been installed by all members, so that dlocks are released and
// the same keys can be used in a new transaction by the same client thread
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
try {
WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager());
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index 8a1f8b1..ded789e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -14,6 +14,14 @@
*/
package org.apache.geode.internal.cache.tx;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.transaction.Status;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionException;
@@ -26,30 +34,22 @@ import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXLockRequest;
+import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
+import org.apache.geode.internal.cache.TXStateStub;
import org.apache.geode.internal.cache.locks.TXRegionLockRequest;
import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import javax.transaction.Status;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
public class ClientTXStateStub extends TXStateStub {
private static final Logger logger = LogService.getLogger();
- // /** a flag to turn off automatic replay of transactions. Maybe this should be a pool property?
- // */
- // private static final boolean ENABLE_REPLAY =
- // Boolean.getBoolean("gemfire.enable-transaction-replay");
- //
- // /** time to pause between transaction replays, in millis */
- // private static final int TRANSACTION_REPLAY_PAUSE =
- // Integer.getInteger("gemfire.transaction-replay-pause", 500).intValue();
-
/** test hook - used to find out what operations were performed in the last tx */
private static ThreadLocal<List<TransactionalOperation>> recordedTransactionalOperations = null;
@@ -91,8 +91,6 @@ public class ClientTXStateStub extends TXStateStub {
recordedTransactionalOperations = t;
}
-
-
public ClientTXStateStub(TXStateProxy stateProxy, DistributedMember target,
LocalRegion firstRegion) {
super(stateProxy, target);
@@ -124,7 +122,7 @@ public class ClientTXStateStub extends TXStateStub {
*/
private void obtainLocalLocks() {
lockReq = new TXLockRequest();
- GemFireCacheImpl cache = GemFireCacheImpl.getExisting("");
+ InternalCache cache = GemFireCacheImpl.getExisting("");
for (TransactionalOperation txOp : this.recordedOperations) {
if (ServerRegionOperation.lockKeyForTx(txOp.getOperation())) {
TXRegionLockRequest rlr = lockReq.getRegionLockRequest(txOp.getRegionName());
@@ -160,7 +158,7 @@ public class ClientTXStateStub extends TXStateStub {
this.internalAfterSendCommit.run();
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// fixes bug 42933
return;
@@ -177,7 +175,6 @@ public class ClientTXStateStub extends TXStateStub {
txcm.basicProcess();
}
-
@Override
protected TXRegionStub generateRegionStub(LocalRegion region) {
return new ClientTXRegionStub(region);
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
index 1aed187..6df6eb9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java
@@ -30,16 +30,13 @@ import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllEn
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList;
import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.annotations.Retained;
-/**
- *
- *
- */
public class DistTxEntryEvent extends EntryEventImpl {
protected static final byte HAS_PUTALL_OP = 0x1;
@@ -100,7 +97,7 @@ public class DistTxEntryEvent extends EntryEventImpl {
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
this.eventID = (EventID) DataSerializer.readObject(in);
String regionName = DataSerializer.readString(in);
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
this.region = (LocalRegion) cache.getRegion(regionName);
this.op = Operation.fromOrdinal(in.readByte());
Object key = DataSerializer.readObject(in);
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index ab3b3cf..fd128c3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -24,17 +24,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.execute.BucketMovedException;
-import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
@@ -61,13 +56,16 @@ import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.HasCachePerfStats;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.execute.BucketMovedException;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator;
import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -292,7 +290,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
this.getSenderAdvisor().setIsPrimary(isPrimary);
}
- public Cache getCache() {
+ public InternalCache getCache() {
return this.cache;
}
@@ -520,7 +518,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
this.getLifeCycleLock().writeLock().lock();
// first, check if this sender is attached to any region. If so, throw
// GatewaySenderException
- Set<LocalRegion> regions = ((GemFireCacheImpl) this.cache).getApplicationRegions();
+ Set<LocalRegion> regions = this.cache.getApplicationRegions();
Iterator regionItr = regions.iterator();
while (regionItr.hasNext()) {
LocalRegion region = (LocalRegion) regionItr.next();
@@ -541,7 +539,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
// remove the sender from the cache
- ((GemFireCacheImpl) this.cache).removeGatewaySender(this);
+ this.cache.removeGatewaySender(this);
// destroy the region underneath the sender's queue
if (initiator) {
@@ -816,7 +814,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return true;
}
-
public void distribute(EnumListenerEvent operation, EntryEventImpl event,
List<Integer> allRemoteDSIds) {
@@ -981,7 +978,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
}
-
/**
* During sender is getting started, if there are any cache operation on queue then that event
* will be stored in temp queue. Once sender is started, these event from tmp queue will be added
@@ -1100,8 +1096,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
boolean gotLock = false;
try {
// Obtain the distributed lock
- gotLock = ((GemFireCacheImpl) getCache()).getGatewaySenderLockService()
- .lock(META_DATA_REGION_NAME, -1, -1);
+ gotLock = getCache().getGatewaySenderLockService().lock(META_DATA_REGION_NAME, -1, -1);
if (!gotLock) {
throw new IllegalStateException(
LocalizedStrings.AbstractGatewaySender_FAILED_TO_LOCK_META_REGION_0
@@ -1143,7 +1138,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
} finally {
// Unlock the lock if necessary
if (gotLock) {
- ((GemFireCacheImpl) getCache()).getGatewaySenderLockService().unlock(META_DATA_REGION_NAME);
+ getCache().getGatewaySenderLockService().unlock(META_DATA_REGION_NAME);
if (isDebugEnabled) {
logger.debug("{}: Unlocked the metadata region", this);
}
@@ -1161,7 +1156,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
@SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
private static synchronized Region<String, Integer> initializeEventIdIndexMetaDataRegion(
AbstractGatewaySender sender) {
- final Cache cache = sender.getCache();
+ final InternalCache cache = sender.getCache();
Region<String, Integer> region = cache.getRegion(META_DATA_REGION_NAME);
if (region == null) {
// Create region attributes (must be done this way to use InternalRegionArguments)
@@ -1183,7 +1178,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
// Create the region
try {
- region = ((GemFireCacheImpl) cache).createVMRegion(META_DATA_REGION_NAME, ra, ira);
+ region = cache.createVMRegion(META_DATA_REGION_NAME, ra, ira);
} catch (RegionExistsException e) {
region = cache.getRegion(META_DATA_REGION_NAME);
} catch (Exception e) {
@@ -1216,7 +1211,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
}
-
public int getTmpQueuedEventSize() {
if (tmpQueuedEvents != null) {
return tmpQueuedEvents.size();
@@ -1300,8 +1294,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
* allows us to defer creation of the GatewaySenderEventImpl until we are ready to actually
* enqueue it. The caller is responsible for giving us an EntryEventImpl that we own and that we
* will release. This is done by making a copy/clone of the original event. This fixes bug 52029.
- *
- *
*/
public static class TmpQueueEvent implements Releasable {
private final EnumListenerEvent operation;
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 263c446..1946945 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -14,15 +14,42 @@
*/
package org.apache.geode.internal.cache.wan;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelException;
import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
@@ -31,13 +58,6 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
/**
* EventProcessor responsible for peeking from queue and handling over the events to the dispatcher.
@@ -46,7 +66,6 @@ import java.util.concurrent.ConcurrentHashMap;
* GatewaySenderEventRemoteDispatcher or GatewaySenderEventCallbackDispatcher.
*
* @since GemFire 7.0
- *
*/
public abstract class AbstractGatewaySenderEventProcessor extends Thread {
@@ -391,7 +410,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
// list of filteredList + pdxEventsToBeDispatched events
List<GatewaySenderEventImpl> eventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
-
for (;;) {
if (stopped()) {
break;
@@ -426,7 +444,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
// filtering VERSION_ACTION events from being sent.
boolean sendUpdateVersionEvents = shouldSendVersionEvents(this.dispatcher);
-
// sleep a little bit, look for events
boolean interrupted = Thread.interrupted();
try {
@@ -435,7 +452,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
resetLastPeekedEvents = false;
}
-
{
// Below code was added to consider the case of queue region is
// destroyed due to userPRs localdestroy or destroy operation.
@@ -761,7 +777,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>();
// getPDXRegion
- GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache();
+ InternalCache cache = this.sender.getCache();
Region<Object, Object> pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME);
if (rebuildPdxList) {
@@ -782,7 +798,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
EntryEventImpl event = EntryEventImpl.create((LocalRegion) pdxRegion, Operation.UPDATE,
typeEntry.getKey(), typeEntry.getValue(), null, false, cache.getMyId());
event.disallowOffHeapValues();
- event.setEventId(new EventID(cache.getSystem()));
+ event.setEventId(new EventID(cache.getInternalDistributedSystem()));
List<Integer> allRemoteDSIds = new ArrayList<Integer>();
for (GatewaySender sender : cache.getGatewaySenders()) {
allRemoteDSIds.add(sender.getRemoteDSId());
@@ -805,7 +821,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
}
-
Iterator<GatewaySenderEventImpl> iterator = pdxSenderEventsList.iterator();
while (iterator.hasNext()) {
GatewaySenderEventImpl pdxEvent = iterator.next();
@@ -838,7 +853,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
* @param remotePdxSize
*/
public void checkIfPdxNeedsResend(int remotePdxSize) {
- GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache();
+ InternalCache cache = this.sender.getCache();
Region<Object, Object> pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME);
// The peer has not seen all of our PDX types. This may be because
@@ -976,7 +991,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
eventQueueRemove(events.size());
}
-
}
public void handleUnSuccessBatchAck(int bId) {
@@ -1014,7 +1028,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
}
-
public abstract void initializeEventDispatcher();
public GatewaySenderEventDispatcher getDispatcher() {
@@ -1248,11 +1261,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
}
- /*
- * public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException { return
- * ((ParallelGatewaySenderQueue)this.queue).size(pr, bucketId); }
- */
-
public void notifyEventProcessorIfRequired(int bucketId) {
((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index c831b26..9472792 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -28,8 +28,8 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.util.Gateway;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.DistributedLockService;
@@ -44,6 +44,7 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -148,12 +149,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
.toString(new Object[] {sp.Id, sp.manualStart, sender.isManualStart()}));
}
}
- /*
- * if(sp.dispatcherThreads != sender.getDispatcherThreads()) { throw new IllegalStateException(
- * LocalizedStrings.
- * GatewaySenderAdvisor_CANNOT_CREATE_GATEWAYSENDER_0_WITH_DISPATCHER_THREAD_1_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_SENDER_WITH_DISPATCHER_THREAD_2
- * .toString(new Object[] { sp.Id, sp.dispatcherThreads, sender.getDispatcherThreads() })); }
- */
if (!sp.isParallel) {
if (sp.orderPolicy != sender.getOrderPolicy()) {
@@ -232,9 +227,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
* if there are no other primary senders then this sender should volunteer for primary. 2. If this
* sender is primary and its policy is secondary then this sender should release the lock so that
* other primary sender which s waiting on lock will get the lock.
- *
*/
-
@Override
public void profileUpdated(Profile profile) {
if (profile instanceof GatewaySenderProfile) {
@@ -299,8 +292,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
public void initDLockService() {
- InternalDistributedSystem ds =
- ((GemFireCacheImpl) this.sender.getCache()).getInternalDistributedSystem();
+ InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem();
String dlsName = getDLockServiceName();
this.lockService = DistributedLockService.getServiceNamed(dlsName);
if (this.lockService == null) {
@@ -560,8 +552,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
this.isDiskSynchronous = in.readBoolean();
this.dispatcherThreads = in.readInt();
if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.CURRENT) < 0) {
- org.apache.geode.cache.util.Gateway.OrderPolicy oldOrderPolicy =
- DataSerializer.readObject(in);
+ Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in);
if (oldOrderPolicy != null) {
if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) {
this.orderPolicy = OrderPolicy.KEY;
@@ -604,14 +595,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.CURRENT) < 0
&& this.orderPolicy != null) {
String orderPolicyName = this.orderPolicy.name();
- if (orderPolicyName.equals(org.apache.geode.cache.util.Gateway.OrderPolicy.KEY.name())) {
- DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.KEY, out);
- } else if (orderPolicyName
- .equals(org.apache.geode.cache.util.Gateway.OrderPolicy.THREAD.name())) {
- DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.THREAD, out);
+ if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) {
+ DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out);
+ } else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) {
+ DataSerializer.writeObject(Gateway.OrderPolicy.THREAD, out);
} else {
- DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.PARTITION,
- out);
+ DataSerializer.writeObject(Gateway.OrderPolicy.PARTITION, out);
}
} else {
DataSerializer.writeObject(orderPolicy, out);
@@ -699,10 +688,9 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
@Override
public void processIncoming(DistributionManager dm, String adviseePath, boolean removeProfile,
boolean exchangeProfiles, final List<Profile> replyProfiles) {
- Cache cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
- AbstractGatewaySender sender =
- (AbstractGatewaySender) ((GemFireCacheImpl) cache).getGatewaySender(adviseePath);
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(adviseePath);
handleDistributionAdvisee(sender, removeProfile, exchangeProfiles, replyProfiles);
}
}
@@ -714,7 +702,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
sb.append("; remoteDSName=" + this.remoteDSId);
sb.append("; isRunning=" + this.isRunning);
sb.append("; isPrimary=" + this.isPrimary);
-
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index ffe7ae0..ed6df0b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -34,15 +34,12 @@ import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -58,8 +55,6 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
*
* The {@link ParallelGatewaySenderQueue} should be shared among all the
* {@link ParallelGatewaySenderEventProcessor}s.
- *
- *
*/
public class ConcurrentParallelGatewaySenderEventProcessor
extends AbstractGatewaySenderEventProcessor {
@@ -67,8 +62,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor
protected static final Logger logger = LogService.getLogger();
protected ParallelGatewaySenderEventProcessor processors[];
- // private final List<ConcurrentParallelGatewaySenderQueue> concurrentParallelQueues;
+
private GemFireException ex = null;
+
final int nDispatcher;
public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender sender) {
@@ -94,8 +90,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
// gets the remaining
// bucket
Set<Region> targetRs = new HashSet<Region>();
- for (LocalRegion pr : ((GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache())
- .getApplicationRegions()) {
+ for (LocalRegion pr : sender.getCache().getApplicationRegions()) {
if (pr.getAllGatewaySenderIds().contains(sender.getId())) {
targetRs.add(pr);
}
@@ -124,17 +119,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
@Override
protected void initializeMessageQueue(String id) {
- /*
- * Set<Region> targetRs = new HashSet<Region>(); for (LocalRegion pr :
- * ((GemFireCacheImpl)((ParallelGatewaySenderImpl)sender) .getCache()).getApplicationRegions())
- * { if (pr.getAllGatewaySenderIds().contains(id)) { targetRs.add(pr); } }
- */
- // this.parallelQueue = new ParallelGatewaySenderQueue(this.sender, targetRs);
- /*
- * if (sender.getIsHDFSQueue()) this.parallelQueue = new
- * HDFSParallelGatewaySenderQueue(this.sender, targetRs); else this.parallelQueue = new
- * ParallelGatewaySenderQueue(this.sender, targetRs);
- */
+ // nothing
}
@Override
@@ -148,14 +133,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor
}
int pId = bucketId % this.nDispatcher;
this.processors[pId].enqueueEvent(operation, event, substituteValue);
-
- /*
- * if (getSender().beforeEnqueue(gatewayQueueEvent)) { long start =
- * getSender().getStatistics().startTime(); try { this.parallelQueue.put(gatewayQueueEvent); }
- * catch (InterruptedException e) { e.printStackTrace(); } finally { if (gatewayQueueEvent !=
- * null) { gatewayQueueEvent.release(); } getSender().getStatistics().endPut(start); } else {
- * getSender().getStatistics().incEventsFiltered(); }
- */
}
@Override
@@ -196,7 +173,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor
}
}
-
private void waitForRunningStatus() {
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
synchronized (parallelProcessor.runningStateLock) {
@@ -218,7 +194,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor
}
}
-
@Override
public void stopProcessing() {
if (!this.isAlive()) {
@@ -299,7 +274,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
parallelProcessor.waitForDispatcherToPause();
}
- // super.waitForDispatcherToPause();
}
@Override
@@ -330,24 +304,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor
}
return l;
}
- /*
- * public List<ConcurrentParallelGatewaySenderQueue> getConcurrentParallelQueues() { return
- * concurrentParallelQueues; }
- */
@Override
public RegionQueue getQueue() {
return this.queue;
}
- /*
- * public Set<PartitionedRegion> getRegions() { return
- * ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getRegions(); }
- *
- * public int localSize() { return
- * ((ParallelGatewaySenderQueue)(processors[0].getQueue())).localSize(); }
- */
-
@Override
public GatewaySenderEventDispatcher getDispatcher() {
return this.processors[0].getDispatcher();// Suranjan is that fine??
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index faf7836..e74270f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -12,47 +12,32 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.wan.parallel;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.size.SingleObjectSizer;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-
-
-/**
- *
- */
public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor {
private static final Logger logger = LogService.getLogger();
@@ -80,7 +65,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
"Event Processor for GatewaySender_" + sender.getId() + "_" + id, sender);
this.index = id;
this.nDispatcher = nDispatcher;
- // this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher);
initializeMessageQueue(sender.getId());
setDaemon(true);
}
@@ -88,8 +72,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
@Override
protected void initializeMessageQueue(String id) {
Set<Region> targetRs = new HashSet<Region>();
- for (LocalRegion region : ((GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache())
- .getApplicationRegions()) {
+ for (LocalRegion region : sender.getCache().getApplicationRegions()) {
if (region.getAllGatewaySenderIds().contains(id)) {
targetRs.add(region);
}
@@ -128,12 +111,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
return;
}
- // TODO : Kishor : Looks like for PDX region bucket id is set to -1.
- // int bucketId = -1;
- // if (!(region instanceof DistributedRegion && ((DistributedRegion)region)
- // .isPdxTypesRegion())) {
- // bucketId = PartitionedRegionHelper.getHashKey(event);
- // }
+ // TODO: Looks like for PDX region bucket id is set to -1.
boolean queuedEvent = false;
try {
EventID eventID = ((EntryEventImpl) event).getEventId();
@@ -143,7 +121,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true,
eventID.getBucketID());
-
if (getSender().beforeEnqueue(gatewayQueueEvent)) {
long start = getSender().getStatistics().startTime();
try {
@@ -170,11 +147,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
}
- /*
- * public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException { return
- * ((ParallelGatewaySenderQueue)this.queue).size(pr, bucketId); }
- */
-
public void notifyEventProcessorIfRequired(int bucketId) {
((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired();
}
@@ -196,19 +168,16 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
}
public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
- // TODO Auto-generated method stub
((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserPR(pr);
}
public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) {
- // TODO Auto-generated method stub
((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(userRegion);
}
@Override
protected void rebalance() {
// No operation for AsyncEventQueuerProcessor
-
}
@Override