You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/08 23:15:54 UTC
[16/46] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
index d2672df..f464e0d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
@@ -14,21 +14,6 @@
*/
package org.apache.geode.internal.cache.persistence;
-import org.apache.commons.io.FileUtils;
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.persistence.PersistentID;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.MembershipListener;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.ClassPathLoader;
-import org.apache.geode.internal.DeployedJar;
-import org.apache.geode.internal.JarDeployer;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
@@ -44,10 +29,25 @@ import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ClassPathLoader;
+import org.apache.geode.internal.DeployedJar;
+import org.apache.geode.internal.JarDeployer;
+import org.apache.geode.internal.cache.DiskStoreImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+
/**
* This class manages the state an logic to backup a single cache.
- *
- *
*/
public class BackupManager implements MembershipListener {
@@ -58,11 +58,11 @@ public class BackupManager implements MembershipListener {
public static final String USER_FILES = "user";
public static final String CONFIG = "config";
private InternalDistributedMember sender;
- private GemFireCacheImpl cache;
+ private InternalCache cache;
private CountDownLatch allowDestroys = new CountDownLatch(1);
private volatile boolean isCancelled = false;
- public BackupManager(InternalDistributedMember sender, GemFireCacheImpl gemFireCache) {
+ public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) {
this.sender = sender;
this.cache = gemFireCache;
}
@@ -81,9 +81,9 @@ public class BackupManager implements MembershipListener {
private void cleanup() {
isCancelled = true;
allowDestroys.countDown();
- Collection<DiskStoreImpl> diskStores = cache.listDiskStoresIncludingRegionOwned();
- for (DiskStoreImpl store : diskStores) {
- store.releaseBackupLock();
+ Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
+ for (DiskStore store : diskStores) {
+ ((DiskStoreImpl) store).releaseBackupLock();
}
final DM distributionManager = cache.getInternalDistributedSystem().getDistributionManager();
distributionManager.removeAllMembershipListener(this);
@@ -92,12 +92,13 @@ public class BackupManager implements MembershipListener {
public HashSet<PersistentID> prepareBackup() {
HashSet<PersistentID> persistentIds = new HashSet<PersistentID>();
- Collection<DiskStoreImpl> diskStores = cache.listDiskStoresIncludingRegionOwned();
- for (DiskStoreImpl store : diskStores) {
- store.lockStoreBeforeBackup();
- if (store.hasPersistedData()) {
- persistentIds.add(store.getPersistentID());
- store.getStats().startBackup();
+ Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
+ for (DiskStore store : diskStores) {
+ DiskStoreImpl storeImpl = (DiskStoreImpl) store;
+ storeImpl.lockStoreBeforeBackup();
+ if (storeImpl.hasPersistedData()) {
+ persistentIds.add(storeImpl.getPersistentID());
+ storeImpl.getStats().startBackup();
}
}
return persistentIds;
@@ -116,9 +117,10 @@ public class BackupManager implements MembershipListener {
/*
* Find the first matching DiskStoreId directory for this member.
*/
- for (DiskStoreImpl diskStore : cache.listDiskStoresIncludingRegionOwned()) {
+ for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
File[] matchingFiles = baselineParentDir.listFiles(new FilenameFilter() {
- Pattern pattern = Pattern.compile(".*" + diskStore.getBackupDirName() + "$");
+ Pattern pattern =
+ Pattern.compile(".*" + ((DiskStoreImpl) diskStore).getBackupDirName() + "$");
public boolean accept(File dir, String name) {
Matcher m = pattern.matcher(name);
@@ -142,7 +144,6 @@ public class BackupManager implements MembershipListener {
* option. May be null if the user specified a full backup.
* @return null if the backup is to be a full backup otherwise return the data store directory in
* the previous backup for this member (if incremental).
- * @throws IOException
*/
private File checkBaseline(File baselineParentDir) throws IOException {
File baselineDir = null;
@@ -188,12 +189,12 @@ public class BackupManager implements MembershipListener {
File storesDir = new File(backupDir, DATA_STORES);
RestoreScript restoreScript = new RestoreScript();
HashSet<PersistentID> persistentIds = new HashSet<PersistentID>();
- Collection<DiskStoreImpl> diskStores =
- new ArrayList<DiskStoreImpl>(cache.listDiskStoresIncludingRegionOwned());
+ Collection<DiskStore> diskStores =
+ new ArrayList<DiskStore>(cache.listDiskStoresIncludingRegionOwned());
boolean foundPersistentData = false;
- for (Iterator<DiskStoreImpl> itr = diskStores.iterator(); itr.hasNext();) {
- DiskStoreImpl store = itr.next();
+ for (Iterator<DiskStore> itr = diskStores.iterator(); itr.hasNext();) {
+ DiskStoreImpl store = (DiskStoreImpl) itr.next();
if (store.hasPersistedData()) {
if (!foundPersistentData) {
createBackupDir(backupDir);
@@ -210,10 +211,11 @@ public class BackupManager implements MembershipListener {
allowDestroys.countDown();
- for (DiskStoreImpl store : diskStores) {
- store.finishBackup(this);
- store.getStats().endBackup();
- persistentIds.add(store.getPersistentID());
+ for (DiskStore store : diskStores) {
+ DiskStoreImpl storeImpl = (DiskStoreImpl) store;
+ storeImpl.finishBackup(this);
+ storeImpl.getStats().endBackup();
+ persistentIds.add(storeImpl.getPersistentID());
}
if (foundPersistentData) {
@@ -227,7 +229,6 @@ public class BackupManager implements MembershipListener {
}
}
-
return persistentIds;
} finally {
@@ -256,7 +257,7 @@ public class BackupManager implements MembershipListener {
FileUtils.copyFile(new File(DistributedSystem.getPropertiesFile()), propertyBackup);
}
- // TODO sbawaska: should the gfsecurity.properties file be backed up?
+ // TODO: should the gfsecurity.properties file be backed up?
}
private void backupUserFiles(RestoreScript restoreScript, File backupDir) throws IOException {
@@ -330,10 +331,7 @@ public class BackupManager implements MembershipListener {
cache.getInternalDistributedSystem().getDistributedMember();
String vmId = memberId.toString();
vmId = cleanSpecialCharacters(vmId);
- File backupDir = new File(targetDir, vmId);
-
-
- return backupDir;
+ return new File(targetDir, vmId);
}
private void createBackupDir(File backupDir) throws IOException {
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
index 5a3c002..c9aeaed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
@@ -26,20 +26,19 @@ import org.apache.geode.cache.snapshot.CacheSnapshotService;
import org.apache.geode.cache.snapshot.RegionSnapshotService;
import org.apache.geode.cache.snapshot.SnapshotOptions;
import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.snapshot.GFSnapshot.GFSnapshotImporter;
import org.apache.geode.internal.i18n.LocalizedStrings;
/**
* Provides an implementation for cache snapshots. Most of the implementation delegates to
* {@link RegionSnapshotService}.
- *
*/
public class CacheSnapshotServiceImpl implements CacheSnapshotService {
/** the cache */
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
- public CacheSnapshotServiceImpl(GemFireCacheImpl cache) {
+ public CacheSnapshotServiceImpl(InternalCache cache) {
this.cache = cache;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java
index 34ddd63..5ba8800 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java
@@ -24,28 +24,24 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.TreeMap;
import org.apache.geode.DataSerializer;
-import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.snapshot.SnapshotIterator;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord;
import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.pdx.PdxSerializationException;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;
/**
* Provides support for reading and writing snapshot files.
- *
*/
public class GFSnapshot {
/**
@@ -260,7 +256,7 @@ public class GFSnapshot {
// write pdx types
try {
- GemFireCacheImpl cache = GemFireCacheImpl
+ InternalCache cache = GemFireCacheImpl
.getForPdx("PDX registry is unavailable because the Cache has been closed.");
new ExportedRegistry(cache.getPdxRegistry()).toData(dos);
} catch (CacheClosedException e) {
@@ -400,7 +396,7 @@ public class GFSnapshot {
}
private TypeRegistry getRegistry() {
- GemFireCacheImpl gfc = GemFireCacheImpl.getInstance();
+ InternalCache gfc = GemFireCacheImpl.getInstance();
if (gfc != null) {
return gfc.getPdxRegistry();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
index 0a182e6..a82a804 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java
@@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.tier;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.InternalCache;
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
index aeabc86..7b291da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.logging.log4j.Logger;
@@ -41,7 +42,6 @@ import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.CacheServerImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -291,7 +291,7 @@ public final class InternalClientMembership {
public static Map getClientQueueSizes() {
Map clientQueueSizes = new HashMap();
- GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getAnyInstance();
+ InternalCache c = (InternalCache) CacheFactory.getAnyInstance();
if (c == null) // Add a NULL Check
return clientQueueSizes;
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ed29472..9114367 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -67,12 +67,15 @@ import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.wan.GatewayTransportFilter;
-import org.apache.geode.distributed.internal.*;
-import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.LonerDistributionManager;
+import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.partitioned.AllBucketProfilesUpdateMessage;
@@ -83,10 +86,11 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.IntegratedSecurityService;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.internal.util.ArrayUtils;
@@ -375,9 +379,9 @@ public class AcceptorImpl extends Acceptor implements Runnable {
.getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
}
- final GemFireCacheImpl gc;
+ final InternalCache gc;
if (getCachedRegionHelper() != null) {
- gc = (GemFireCacheImpl) getCachedRegionHelper().getCache();
+ gc = getCachedRegionHelper().getCache();
} else {
gc = null;
}
@@ -655,8 +659,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
+ this.localPort + " local port: " + this.serverSock.getLocalPort());
this.selectorThread.start();
}
- GemFireCacheImpl myCache = (GemFireCacheImpl) cache;
- Set<PartitionedRegion> prs = myCache.getPartitionedRegions();
+ Set<PartitionedRegion> prs = this.cache.getPartitionedRegions();
for (PartitionedRegion pr : prs) {
Map<Integer, BucketAdvisor.BucketProfile> profiles =
new HashMap<Integer, BucketAdvisor.BucketProfile>();
@@ -957,7 +960,7 @@ public class AcceptorImpl extends Acceptor implements Runnable {
{
SystemFailure.checkFailure();
// this.cache.getDistributedSystem().getCancelCriterion().checkCancelInProgress(null);
- if (((GemFireCacheImpl) this.cache).isClosed()) { // bug 38834
+ if (this.cache.isClosed()) { // bug 38834
break; // TODO should just ask cache's CancelCriterion
}
if (this.cache.getCancelCriterion().isCancelInProgress()) {
@@ -1559,9 +1562,8 @@ public class AcceptorImpl extends Acceptor implements Runnable {
}
private void notifyCacheMembersOfClose() {
- GemFireCacheImpl myCache = (GemFireCacheImpl) cache;
- if (!myCache.forcedDisconnect()) {
- for (PartitionedRegion pr : myCache.getPartitionedRegions()) {
+ if (!this.cache.forcedDisconnect()) {
+ for (PartitionedRegion pr : this.cache.getPartitionedRegions()) {
Map<Integer, BucketAdvisor.BucketProfile> profiles = new HashMap<>();
// get all local real bucket advisors
Map<Integer, BucketAdvisor> advisors = pr.getRegionAdvisor().getAllBucketAdvisors();
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index d217672..58ba4b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -12,13 +12,39 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets;
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.regex.Pattern;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.CopyException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.SerializationException;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -27,8 +53,21 @@ import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FindVersionTagOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.LocalRegion.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.VersionTagHolder;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.InterestType;
@@ -44,13 +83,6 @@ import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.security.GemFireSecurityException;
-import org.apache.logging.log4j.Logger;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.Semaphore;
-import java.util.regex.Pattern;
-
public abstract class BaseCommand implements Command {
protected static final Logger logger = LogService.getLogger();
@@ -125,7 +157,7 @@ public abstract class BaseCommand implements Command {
boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn);
try {
if (shouldMasquerade) {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
InternalDistributedMember member =
(InternalDistributedMember) servConn.getProxyID().getDistributedMember();
TXManagerImpl txMgr = cache.getTxManager();
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 28d6ae2..e79bfbd 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.*;
@@ -51,7 +50,6 @@ import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.StatisticsFactory;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.InterestRegistrationEvent;
@@ -79,6 +77,7 @@ import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.ClassLoadUtil;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
@@ -116,7 +115,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.security.AccessControl;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
@@ -125,7 +123,6 @@ import org.apache.geode.security.AuthenticationRequiredException;
* Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
* to clients requesting notification of updates and notifies them when updates occur.
*
- *
* @since GemFire 3.2
*/
@SuppressWarnings({"synthetic-access", "deprecation"})
@@ -137,15 +134,10 @@ public class CacheClientNotifier {
/**
* Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
*
- * @param cache The GemFire <code>Cache</code>
- * @param acceptorStats
- * @param maximumMessageCount
- * @param messageTimeToLive
- * @param listener
- * @param overflowAttributesList
+ * @param cache The GemFire <code>InternalCache</code>
* @return A <code>CacheClientNotifier</code> instance
*/
- public static synchronized CacheClientNotifier getInstance(Cache cache,
+ public static synchronized CacheClientNotifier getInstance(InternalCache cache,
CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
if (ccnSingleton == null) {
@@ -158,13 +150,6 @@ public class CacheClientNotifier {
// In this case, the HaContainer should be lazily created here
ccnSingleton.initHaContainer(overflowAttributesList);
}
- // else {
- // ccnSingleton.acceptorStats = acceptorStats;
- // ccnSingleton.maximumMessageCount = maximumMessageCount;
- // ccnSingleton.messageTimeToLive = messageTimeToLive;
- // ccnSingleton._connectionListener = listener;
- // ccnSingleton.setCache((GemFireCache)cache);
- // }
return ccnSingleton;
}
@@ -178,8 +163,6 @@ public class CacheClientNotifier {
* @param dos the <code>DataOutputStream</code> to use for writing the message
* @param type a byte representing the message type
* @param p_msg the message to be written; can be null
- * @param clientVersion
- *
*/
private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion)
throws IOException {
@@ -248,35 +231,12 @@ public class CacheClientNotifier {
* @param dos the <code>DataOutputStream</code> to use for writing the message
* @param type a byte representing the exception type
* @param ex the exception to be written; should not be null
- * @param clientVersion
- *
*/
private void writeException(DataOutputStream dos, byte type, Exception ex, Version clientVersion)
throws IOException {
-
writeMessage(dos, type, ex.toString(), clientVersion);
}
- // /**
- // * Factory method to return the singleton <code>CacheClientNotifier</code>
- // * instance.
- // * @return the singleton <code>CacheClientNotifier</code> instance
- // */
- // public static CacheClientNotifier getInstance()
- // {
- // return _instance;
- // }
-
- // /**
- // * Shuts down the singleton <code>CacheClientNotifier</code> instance.
- // */
- // public static void shutdownInstance()
- // {
- // if (_instance == null) return;
- // _instance.shutdown();
- // _instance = null;
- // }
-
/**
* Registers a new client updater that wants to receive updates with this server.
*
@@ -355,12 +315,10 @@ public class CacheClientNotifier {
DistributedSystem system = this.getCache().getDistributedSystem();
Properties sysProps = system.getProperties();
String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR);
- // TODO;hitesh for conflation
+
if (clientVersion.compareTo(Version.GFE_603) >= 0) {
byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()});
-
clientConflation = overrides[0];
-
} else {
clientConflation = (byte) dis.read();
}
@@ -379,7 +337,6 @@ public class CacheClientNotifier {
proxy = registerClient(socket, proxyID, proxy, isPrimary, clientConflation, clientVersion,
acceptorId, notifyBySubscription);
- // TODO:hitesh
Properties credentials = HandShake.readCredentials(dis, dos, system);
if (credentials != null && proxy != null) {
if (securityLogWriter.fineEnabled()) {
@@ -445,11 +402,9 @@ public class CacheClientNotifier {
return;
}
-
this._statistics.endClientRegistration(startTime);
}
-
/**
* Registers a new client that wants to receive updates with this server.
*
@@ -678,7 +633,6 @@ public class CacheClientNotifier {
/**
* Makes Primary to this CacheClientProxy and start the dispatcher of the CacheClientProxy
*
- * @param proxyId
* @param isClientReady Whether the marker has already been processed. This value helps determine
* whether to start the dispatcher.
*/
@@ -695,9 +649,6 @@ public class CacheClientNotifier {
* Then, start or resume the dispatcher. Otherwise, let the clientReady message start the
* dispatcher. See CacheClientProxy.startOrResumeMessageDispatcher if
* (!proxy._messageDispatcher.isAlive()) {
- *
- * proxy._messageDispatcher._messageQueue.setPrimary(true); proxy._messageDispatcher.start();
- * }
*/
if (isClientReady || !proxy.isDurable()) {
if (logger.isDebugEnabled()) {
@@ -713,8 +664,6 @@ public class CacheClientNotifier {
/**
* Adds or updates entry in the dispatched message map when client sends an ack.
*
- * @param proxyId
- * @param eid
* @return success
*/
public boolean processDispatchedMessage(ClientProxyMembershipID proxyId, EventID eid) {
@@ -754,8 +703,6 @@ public class CacheClientNotifier {
* Unregisters an existing client from this server.
*
* @param memberId Uniquely identifies the client
- *
- *
*/
public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
if (logger.isDebugEnabled()) {
@@ -781,13 +728,11 @@ public class CacheClientNotifier {
/**
* The client represented by the proxyId is ready to receive updates.
- *
- * @param proxyId
*/
public void readyForEvents(ClientProxyMembershipID proxyId) {
CacheClientProxy proxy = getClientProxy(proxyId);
if (proxy == null) {
- // @todo log a message
+ // TODO: log a message
} else {
// False signifies that a marker message has not already been processed.
// Generate and send one.
@@ -817,7 +762,6 @@ public class CacheClientNotifier {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, null);
-
}
}
@@ -829,7 +773,6 @@ public class CacheClientNotifier {
CacheClientNotifier instance = ccnSingleton;
if (instance != null) {
instance.singletonNotifyClients(event, cmsg);
-
}
}
@@ -839,10 +782,6 @@ public class CacheClientNotifier {
FilterInfo filterInfo = event.getLocalFilterInfo();
- // if (_logger.fineEnabled()) {
- // _logger.fine("Client dispatcher processing event " + event);
- // }
-
FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
if (filterInfo != null) {
// if the routing was made using an old profile we need to recompute it
@@ -905,7 +844,7 @@ public class CacheClientNotifier {
}
if (!ids.isEmpty()) {
if (isTraceEnabled) {
- logger.trace("adding invalidation routing to message for {}" + ids);
+ logger.trace("adding invalidation routing to message for {}", ids);
}
clientMessage.addClientInterestList(ids, false);
filterClients.addAll(ids);
@@ -964,10 +903,8 @@ public class CacheClientNotifier {
if (filterInfo.filterProcessedLocally) {
removeDestroyTokensFromCqResultKeys(event, filterInfo);
}
-
}
-
private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event,
FilterInfo filterInfo) {
FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile();
@@ -986,12 +923,9 @@ public class CacheClientNotifier {
}
}
-
/**
* delivers the given message to all proxies for routing. The message should already have client
* interest established, or override the isClientInterested method to implement its own routing
- *
- * @param clientMessage
*/
public static void routeClientMessage(Conflatable clientMessage) {
CacheClientNotifier instance = ccnSingleton;
@@ -1014,7 +948,7 @@ public class CacheClientNotifier {
}
}
- /*
+ /**
* this is for server side registration of client queue
*/
public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
@@ -1054,7 +988,6 @@ public class CacheClientNotifier {
if (deadProxies != null) {
closeDeadProxies(deadProxies, false);
}
-
}
/**
@@ -1146,7 +1079,6 @@ public class CacheClientNotifier {
* @param operation The operation that occurred (e.g. AFTER_CREATE)
* @param event The event containing the data to be updated
* @return a <code>ClientUpdateMessage</code>
- * @throws Exception
*/
private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event)
throws Exception {
@@ -1219,87 +1151,6 @@ public class CacheClientNotifier {
|| operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
}
- // /**
- // * Queues the <code>ClientUpdateMessage</code> to be distributed
- // * to interested clients. This method is not being used currently.
- // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued
- // */
- // protected void notifyClients(final ClientUpdateMessage clientMessage)
- // {
- // if (USE_SYNCHRONOUS_NOTIFICATION)
- // {
- // // Execute the method in the same thread as the caller
- // deliver(clientMessage);
- // }
- // else {
- // // Obtain an Executor and use it to execute the method in its own thread
- // try
- // {
- // getExecutor().execute(new Runnable()
- // {
- // public void run()
- // {
- // deliver(clientMessage);
- // }
- // }
- // );
- // } catch (InterruptedException e)
- // {
- // _logger.warning("CacheClientNotifier: notifyClients interrupted", e);
- // Thread.currentThread().interrupt();
- // }
- // }
- // }
-
- // /**
- // * Updates the information this <code>CacheClientNotifier</code> maintains
- // * for a given edge client. It is invoked when a edge client re-connects to
- // * the server.
- // *
- // * @param clientHost
- // * The host on which the client runs (i.e. the host the
- // * CacheClientNotifier uses to communicate with the
- // * CacheClientUpdater) This is used with the clientPort to uniquely
- // * identify the client
- // * @param clientPort
- // * The port through which the server communicates with the client
- // * (i.e. the port the CacheClientNotifier uses to communicate with
- // * the CacheClientUpdater) This is used with the clientHost to
- // * uniquely identify the client
- // * @param remotePort
- // * The port through which the client communicates with the server
- // * (i.e. the new port the ConnectionImpl uses to communicate with the
- // * ServerConnection)
- // * @param membershipID
- // * Uniquely idenifies the client
- // */
- // public void registerClientPort(String clientHost, int clientPort,
- // int remotePort, ClientProxyMembershipID membershipID)
- // {
- // if (_logger.fineEnabled())
- // _logger.fine("CacheClientNotifier: Registering client port: "
- // + clientHost + ":" + clientPort + " with remote port " + remotePort
- // + " and ID " + membershipID);
- // for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
- // CacheClientProxy proxy = (CacheClientProxy)i.next();
- // if (_logger.finerEnabled())
- // _logger.finer("CacheClientNotifier: Potential client: " + proxy);
- // //if (proxy.representsCacheClientUpdater(clientHost, clientPort))
- // if (proxy.isMember(membershipID)) {
- // if (_logger.finerEnabled())
- // _logger
- // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match");
- // proxy.addPort(remotePort);
- // }
- // else {
- // if (_logger.finerEnabled())
- // _logger.finer("CacheClientNotifier: Host and port "
- // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort()
- // + " do not match " + clientHost + ":" + clientPort);
- // }
- // }
- // }
-
/**
* Registers client interest in the input region and key.
*
@@ -1350,23 +1201,9 @@ public class CacheClientNotifier {
}
}
- /*
- * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID
- * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion
- * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new
- * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 //
- * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " +
- * membershipID + " :Throwing RegionDestroyedException as region: " + regionName +
- * " is not present."); } throw new RegionDestroyedException("registerInterest failed",
- * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID);
- * } }
- */
-
/**
* Store region and delta relation
*
- * @param regionsWithEmptyDataPolicy
- * @param regionName
* @param regionDataPolicy (0==empty)
* @since GemFire 6.1
*/
@@ -1457,13 +1294,11 @@ public class CacheClientNotifier {
}
}
-
/**
* If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
* in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
* to false. Also, if the ref count is zero, then remove the entry from the haContainer.
*
- * @param conflatable
* @since GemFire 5.7
*/
private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
@@ -1484,9 +1319,6 @@ public class CacheClientNotifier {
}
}
}
- // else {
- // This is a replay-of-event case.
- // }
} else {
// This wrapper resides in haContainer.
wrapper.setClientUpdateMessage(null);
@@ -1525,7 +1357,6 @@ public class CacheClientNotifier {
return proxy;
}
-
/**
* Returns the <code>CacheClientProxy</code> associated to the durableClientId
*
@@ -1595,10 +1426,6 @@ public class CacheClientNotifier {
membershipID);
logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
this, getClientProxies().size());
- /*
- * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " +
- * getClientProxies());
- */
}
CacheClientProxy proxy = null;
for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
@@ -1618,7 +1445,6 @@ public class CacheClientNotifier {
return proxy;
}
-
/**
* It will remove the clients connected to the passed acceptorId. If its the only server, shuts
* down this instance.
@@ -1704,7 +1530,6 @@ public class CacheClientNotifier {
}
}
this.timedOutDurableClientProxies.remove(proxy.getProxyID());
-
}
protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
@@ -1719,7 +1544,6 @@ public class CacheClientNotifier {
return this._initClientProxies.containsKey(proxy.getProxyID());
}
-
/**
* Returns (possibly stale) set of memberIds for all clients being actively notified by this
* server.
@@ -1781,7 +1605,6 @@ public class CacheClientNotifier {
* @since GemFire 5.6
*/
public boolean hasPrimaryForDurableClient(String durableId) {
-
for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
CacheClientProxy proxy = (CacheClientProxy) iter.next();
ClientProxyMembershipID proxyID = proxy.getProxyID();
@@ -1828,7 +1651,6 @@ public class CacheClientNotifier {
return false;
}
-
/**
* Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
*
@@ -1840,14 +1662,13 @@ public class CacheClientNotifier {
ClientProxyMembershipID client = proxy.getProxyID();
this._clientProxies.remove(client);
this._connectionListener.queueRemoved();
- ((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client);
+ this.getCache().cleanupForClient(this, client);
if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
if (chm != null) {
chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
}
}
-
}
void durableClientTimedOut(ClientProxyMembershipID client) {
@@ -1868,17 +1689,6 @@ public class CacheClientNotifier {
return Collections.unmodifiableCollection(this._clientProxies.values());
}
- // /**
- // * Returns the <code>Executor</code> that delivers messages to the
- // * <code>CacheClientProxy</code> instances.
- // * @return the <code>Executor</code> that delivers messages to the
- // * <code>CacheClientProxy</code> instances
- // */
- // protected Executor getExecutor()
- // {
- // return _executor;
- // }
-
private void closeAllClientCqs(CacheClientProxy proxy) {
CqService cqService = proxy.getCache().getCqService();
if (cqService != null) {
@@ -1901,7 +1711,6 @@ public class CacheClientNotifier {
/**
* Shuts down durable client proxy
- *
*/
public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
CacheClientProxy ccp = getClientProxy(durableClientId);
@@ -1917,6 +1726,7 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("Cannot close running durable client: {}", durableClientId);
}
+ // TODO: never throw an anonymous inner class
throw new CacheException("Cannot close a running durable client : " + durableClientId) {};
}
}
@@ -1960,7 +1770,6 @@ public class CacheClientNotifier {
} // for
}
-
/**
* Registers a new <code>InterestRegistrationListener</code> with the set of
* <code>InterestRegistrationListener</code>s.
@@ -2031,13 +1840,13 @@ public class CacheClientNotifier {
}
/**
- * Returns this <code>CacheClientNotifier</code>'s <code>Cache</code>.
+ * Returns this <code>CacheClientNotifier</code>'s <code>InternalCache</code>.
*
- * @return this <code>CacheClientNotifier</code>'s <code>Cache</code>
+ * @return this <code>CacheClientNotifier</code>'s <code>InternalCache</code>
*/
- protected Cache getCache() { // TODO:SYNC: looks wrong
+ protected InternalCache getCache() { // TODO:SYNC: looks wrong
if (this._cache != null && this._cache.isClosed()) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
this._cache = cache;
this.logWriter = cache.getInternalLogWriter();
@@ -2068,25 +1877,18 @@ public class CacheClientNotifier {
protected void handleInterestEvent(InterestRegistrationEvent event) {
LocalRegion region = (LocalRegion) event.getRegion();
region.handleInterestEvent(event);
-
}
/**
- * Constructor.
- *
- * @param cache The GemFire <code>Cache</code>
- * @param acceptorStats
- * @param maximumMessageCount
- * @param messageTimeToLive
+ * @param cache The GemFire <code>InternalCache</code>
* @param listener a listener which should receive notifications abouts queues being added or
* removed.
- * @param overflowAttributesList
*/
- private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount,
- int messageTimeToLive, ConnectionListener listener, List overflowAttributesList,
- boolean isGatewayReceiver) {
+ private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
+ int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
+ List overflowAttributesList, boolean isGatewayReceiver) {
// Set the Cache
- this.setCache((GemFireCacheImpl) cache);
+ setCache(cache);
this.acceptorStats = acceptorStats;
this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
// for close
@@ -2111,9 +1913,6 @@ public class CacheClientNotifier {
}
this._statistics = new CacheClientNotifierStats(factory);
- // Initialize the executors
- // initializeExecutors(this._logger);
-
try {
this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
if (this.logFrequency <= 0) {
@@ -2167,13 +1966,6 @@ public class CacheClientNotifier {
}
}
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.distributed.internal.DistributionMessage#process(org.apache.geode.
- * distributed.internal.DistributionManager)
- */
@Override
protected void process(DistributionManager dm) {
// Get the proxy for the proxy id
@@ -2199,11 +1991,6 @@ public class CacheClientNotifier {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.DataSerializableFixedID#getDSFID()
- */
public int getDSFID() {
return SERVER_INTEREST_REGISTRATION_MESSAGE;
}
@@ -2225,107 +2012,8 @@ public class CacheClientNotifier {
this.clientMessage = new ClientInterestMessageImpl();
InternalDataSerializer.invokeFromData(this.clientMessage, in);
}
-
}
-
- // * Initializes the <code>QueuedExecutor</code> and
- // <code>PooledExecutor</code>
- // * used to deliver messages to <code>CacheClientProxy</code> instances.
- // * @param logger The GemFire <code>LogWriterI18n</code>
- // */
- // private void initializeExecutors(LogWriterI18n logger)
- // {
- // // Create the thread groups
- // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache
- // Client Notifier Logger Group", logger);
- // final ThreadGroup notifierGroup =
- // new ThreadGroup("Cache Client Notifier Group")
- // {
- // public void uncaughtException(Thread t, Throwable e)
- // {
- // Thread.dumpStack();
- // loggerGroup.uncaughtException(t, e);
- // //CacheClientNotifier.exceptionInThreads = true;
- // }
- // };
- //
- // // Originally set ThreadGroup to be a daemon, but it was causing the
- // following
- // // exception after five minutes of non-activity (the keep alive time of the
- // // threads in the PooledExecutor.
- //
- // // java.lang.IllegalThreadStateException
- // // at java.lang.ThreadGroup.add(Unknown Source)
- // // at java.lang.Thread.init(Unknown Source)
- // // at java.lang.Thread.<init>(Unknown Source)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321)
- // // at
- // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512)
- // // at
- // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95)
- // // at
- // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271)
- //
- // //notifierGroup.setDaemon(true);
- //
- // if (USE_QUEUED_EXECUTOR)
- // createQueuedExecutor(notifierGroup);
- // else
- // createPooledExecutor(notifierGroup);
- // }
-
- // /**
- // * Creates the <code>QueuedExecutor</code> used to deliver messages
- // * to <code>CacheClientProxy</code> instances
- // * @param notifierGroup The <code>ThreadGroup</code> to which the
- // * <code>QueuedExecutor</code>'s <code>Threads</code> belong
- // */
- // protected void createQueuedExecutor(final ThreadGroup notifierGroup)
- // {
- // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue());
- // queuedExecutor.setThreadFactory(new ThreadFactory()
- // {
- // public Thread newThread(Runnable command)
- // {
- // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client
- // Notifier");
- // thread.setDaemon(true);
- // return thread;
- // }
- // });
- // _executor = queuedExecutor;
- // }
-
- // /**
- // * Creates the <code>PooledExecutor</code> used to deliver messages
- // * to <code>CacheClientProxy</code> instances
- // * @param notifierGroup The <code>ThreadGroup</code> to which the
- // * <code>PooledExecutor</code>'s <code>Threads</code> belong
- // */
- // protected void createPooledExecutor(final ThreadGroup notifierGroup)
- // {
- // PooledExecutor pooledExecutor = new PooledExecutor(new
- // BoundedLinkedQueue(4096), 50);
- // pooledExecutor.setMinimumPoolSize(10);
- // pooledExecutor.setKeepAliveTime(1000 * 60 * 5);
- // pooledExecutor.setThreadFactory(new ThreadFactory()
- // {
- // public Thread newThread(Runnable command)
- // {
- // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client
- // Notifier");
- // thread.setDaemon(true);
- // return thread;
- // }
- // });
- // pooledExecutor.createThreads(5);
- // _executor = pooledExecutor;
- // }
-
protected void deliverInterestChange(ClientProxyMembershipID proxyID,
ClientInterestMessageImpl message) {
DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
@@ -2471,23 +2159,6 @@ public class CacheClientNotifier {
*/
protected static final int ALL_PORTS = -1;
- // /**
- // * Whether to synchonously deliver messages to proxies.
- // * This is currently hard-coded to true to ensure ordering.
- // */
- // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION =
- // true;
- // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION");
-
- // /**
- // * Whether to use the <code>QueuedExecutor</code> (or the
- // * <code>PooledExecutor</code>) to deliver messages to proxies.
- // * Currently, delivery is synchronous. No <code>Executor</code> is
- // * used.
- // */
- // protected static final boolean USE_QUEUED_EXECUTOR =
- // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR");
-
/**
* The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
* CacheClientProxy. Note that the keys in this map are not updated when a durable client
@@ -2508,11 +2179,11 @@ public class CacheClientNotifier {
new HashSet<ClientProxyMembershipID>();
/**
- * The GemFire <code>Cache</code>. Note that since this is a singleton class you should not use a
- * direct reference to _cache in CacheClientNotifier code. Instead, you should always use
- * <code>getCache()</code>
+ * The GemFire <code>InternalCache</code>. Note that since this is a singleton class you should
+ * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always
+ * use <code>getCache()</code>
*/
- private GemFireCacheImpl _cache;
+ private InternalCache _cache;
private InternalLogWriter logWriter;
@@ -2543,10 +2214,6 @@ public class CacheClientNotifier {
*/
private volatile HAContainerWrapper haContainer;
- // /**
- // * The singleton <code>CacheClientNotifier</code> instance
- // */
- // protected static CacheClientNotifier _instance;
/**
* The size of the server-to-client communication socket buffers. This can be modified using the
* BridgeServer.SOCKET_BUFFER_SIZE system property.
@@ -2631,9 +2298,8 @@ public class CacheClientNotifier {
// lazily initialize haContainer in case this CCN instance was created by a gateway receiver
if (overflowAttributesList != null
&& !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) {
- haContainer = new HAContainerRegion(_cache.getRegion(
- Region.SEPARATOR + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl) _cache,
- (String) overflowAttributesList.get(0),
+ haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
+ + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0),
((Integer) overflowAttributesList.get(1)).intValue(),
((Integer) overflowAttributesList.get(2)).intValue(),
(String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4))));
@@ -2664,11 +2330,10 @@ public class CacheClientNotifier {
/**
* @param _cache the _cache to set
*/
- private void setCache(GemFireCacheImpl _cache) {
+ private void setCache(InternalCache _cache) {
this._cache = _cache;
}
-
private class ExpireBlackListTask extends PoolTask {
private ClientProxyMembershipID proxyID;
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 19a7a32..75c89ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
import java.io.ByteArrayInputStream;
@@ -39,11 +38,14 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
+import org.apache.logging.log4j.Logger;
+import org.apache.shiro.subject.Subject;
+import org.apache.shiro.util.ThreadState;
+
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.ClientSession;
import org.apache.geode.cache.DynamicRegionFactory;
@@ -80,8 +82,8 @@ import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InterestRegistrationEventImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.StateFlushOperation;
@@ -102,16 +104,12 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.security.AuthorizeRequestPP;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.AccessControl;
-import org.apache.logging.log4j.Logger;
-import org.apache.shiro.subject.Subject;
-import org.apache.shiro.util.ThreadState;
/**
* Class <code>CacheClientProxy</code> represents the server side of the {@link CacheClientUpdater}.
* It queues messages to be sent from the server to the client. It then reads those messages from
* the queue and sends them to the client.
*
- *
* @since GemFire 4.2
*/
@SuppressWarnings("synthetic-access")
@@ -153,7 +151,7 @@ public class CacheClientProxy implements ClientSession {
/**
* The GemFire cache
*/
- protected final GemFireCacheImpl _cache;
+ protected final InternalCache _cache;
/**
* The list of keys that the client represented by this proxy is interested in (stored by region)
@@ -345,7 +343,7 @@ public class CacheClientProxy implements ClientSession {
Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException {
initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
this._cacheClientNotifier = ccn;
- this._cache = (GemFireCacheImpl) ccn.getCache();
+ this._cache = ccn.getCache();
this._maximumMessageCount = ccn.getMaximumMessageCount();
this._messageTimeToLive = ccn.getMessageTimeToLive();
this._acceptorId = acceptorId;
@@ -620,7 +618,7 @@ public class CacheClientProxy implements ClientSession {
*
* @return the GemFire cache
*/
- public GemFireCacheImpl getCache() {
+ public InternalCache getCache() {
return this._cache;
}
@@ -2344,7 +2342,7 @@ public class CacheClientProxy implements ClientSession {
return this._proxy;
}
- private GemFireCacheImpl getCache() {
+ private InternalCache getCache() {
return getProxy().getCache();
}
@@ -2410,10 +2408,6 @@ public class CacheClientProxy implements ClientSession {
Thread.sleep(500);
} catch (InterruptedException e) {
interrupted = true;
- /*
- * GemFireCache c = (GemFireCache)_cache;
- * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);
- */
} catch (CancelException e) {
break;
} catch (CacheException e) {
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index f85ecb4..728abf7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -12,24 +12,68 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
-import org.apache.geode.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLException;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.InvalidDeltaException;
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.StatisticsTypeFactory;
+import org.apache.geode.SystemFailure;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
-import org.apache.geode.cache.client.internal.*;
+import org.apache.geode.cache.client.internal.ClientUpdater;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.EndpointManager;
+import org.apache.geode.cache.client.internal.GetEventValueOp;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.QueueManager;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener;
+import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MemberAttributes;
-import org.apache.geode.internal.*;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.InternalInstantiator;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.ClientServerObserver;
+import org.apache.geode.internal.cache.ClientServerObserverHolder;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
@@ -48,19 +92,6 @@ import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
-import org.apache.logging.log4j.Logger;
-
-import javax.net.ssl.SSLException;
-import java.io.*;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* <code>CacheClientUpdater</code> is a thread that processes update messages from a cache server
@@ -107,6 +138,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* The buffer upon which we receive messages
*/
private final ByteBuffer commBuffer;
+
private boolean commBufferReleased;
private final CCUStats stats;
@@ -114,9 +146,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Cache for which we provide service
*/
- private /* final */ GemFireCacheImpl cache;
- private /* final */ CachedRegionHelper cacheHelper;
+ private /* final */ InternalCache cache;
+ private /* final */ CachedRegionHelper cacheHelper;
/**
* Principle flag to signal thread's run loop to terminate
@@ -144,7 +176,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
private boolean isOpCompleted;
public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
- /*
+
+ /**
* to enable test flag
*/
public static boolean isUsedByTest;
@@ -155,20 +188,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
public static boolean fullValueRequested = false;
- // /**
- // * True if this thread been initialized. Indicates that the run thread is
- // * initialized and ready to process messages
- // * <p>
- // * TODO is this still needed?
- // * <p>
- // * Accesses synchronized via <code>this</code>
- // *
- // * @see #notifyInitializationComplete()
- // * @see #waitForInitialization()
- // */
- // private boolean initialized = false;
-
-
private final ServerLocation location;
// TODO - remove these fields
@@ -185,7 +204,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
* @return true if cache appears
*/
private boolean waitForCache() {
- GemFireCacheImpl c;
+ InternalCache cache;
long tilt = System.currentTimeMillis() + MAX_CACHE_WAIT * 1000;
for (;;) {
if (quitting()) {
@@ -205,8 +224,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
new Object[] {this, MAX_CACHE_WAIT}));
return false;
}
- c = GemFireCacheImpl.getInstance();
- if (c != null && !c.isClosed()) {
+ cache = GemFireCacheImpl.getInstance();
+ if (cache != null && !cache.isClosed()) {
break;
}
boolean interrupted = Thread.interrupted();
@@ -220,8 +239,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
} // for
- this.cache = c;
- this.cacheHelper = new CachedRegionHelper(c);
+ this.cache = cache;
+ this.cacheHelper = new CachedRegionHelper(cache);
return true;
}
@@ -270,7 +289,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
OutputStream tmpOut = null;
InputStream tmpIn = null;
try {
- /** Size of the server-to-client communication socket buffers */
+ // Size of the server-to-client communication socket buffers
int socketBufferSize =
Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
@@ -323,7 +342,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// create a "server" memberId we currently don't know much about the
// server.
// Would be nice for it to send us its member id
- // @todo - change the serverId to use the endpoint's getMemberId() which
+ // TODO: change the serverId to use the endpoint's getMemberId() which
// returns a
// DistributedMember (once gfecq branch is merged to trunk).
MemberAttributes ma =
@@ -464,52 +483,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
- // /**
- // * Waits for this thread to be initialized
- // *
- // * @return true if initialized; false if stopped before init
- // */
- // public boolean waitForInitialization() {
- // boolean result = false;
- // // Yogesh : waiting on this thread object is a bad idea
- // // as when thread exits it notifies to the waiting threads.
- // synchronized (this) {
- // for (;;) {
- // if (quitting()) {
- // break;
- // }
- // boolean interrupted = Thread.interrupted();
- // try {
- // this.wait(100); // spurious wakeup ok // timed wait, should fix lost notification problem
- // rahul.
- // }
- // catch (InterruptedException e) {
- // interrupted = true;
- // }
- // finally {
- // if (interrupted) {
- // Thread.currentThread().interrupt();
- // }
- // }
- // } // while
- // // Even if we succeed, there is a risk that we were shut down
- // // Can't check for cache; it isn't set yet :-(
- // this.system.getCancelCriterion().checkCancelInProgress(null);
- // result = this.continueProcessing;
- // } // synchronized
- // return result;
- // }
-
- // /**
- // * @see #waitForInitialization()
- // */
- // private void notifyInitializationComplete() {
- // synchronized (this) {
- // this.initialized = true;
- // this.notifyAll();
- // }
- // }
-
/**
* Notifies this thread to stop processing
*/
@@ -1188,21 +1161,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// message
if (region.hasServerProxy()) {
return;
-
- // NOTE:
- // As explained in the method description, this code is added as part
- // of CQ bug fix. Cache server team needs to look into changes relating
- // to local region.
- //
- // Locally invalidate the region
- // region.basicBridgeClientInvalidate(callbackArgument,
- // proxy.getProcessedMarker());
-
- // if (logger.debugEnabled()) {
- // logger.debug(toString() + ": Cleared region: " + regionName
- // + " callbackArgument: " + callbackArgument);
- // }
-
}
} catch (Exception e) {
@@ -1241,12 +1199,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// servers recursively
}
- // // CALLBACK TESTING PURPOSE ONLY ////
+ // CALLBACK TESTING PURPOSE ONLY
if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterReceivingFromServer(eventId);
}
- // /////////////////////////////////////
+
}
// TODO bug: can the following catch be more specific?
catch (Exception e) {
@@ -1262,7 +1220,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
int noOfParts = msg.getNumberOfParts();
- // int numOfClasses = noOfParts - 3; // 1 for ds classname, 1 for ds id and 1 for eventId.
if (isDebugEnabled) {
logger.debug("{}: Received register dataserializer message of parts {}", getName(),
noOfParts);
@@ -1273,8 +1230,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
String dataSerializerClassName =
(String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
int id = msg.getPart(i + 1).getInt();
- InternalDataSerializer.register(dataSerializerClassName, false, eventId,
- null/* context */, id);
+ InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id);
// distribute is false because we don't want to propagate this to
// servers recursively
@@ -1295,12 +1251,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
- // // CALLBACK TESTING PURPOSE ONLY ////
+ // CALLBACK TESTING PURPOSE ONLY
if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterReceivingFromServer(eventId);
}
- ///////////////////////////////////////
+
}
// TODO bug: can the following catch be more specific?
catch (Exception e) {
@@ -1313,12 +1269,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
/**
* Processes message to invoke CQ listeners.
- *
- * @param startMessagePart
- * @param numCqParts
- * @param messageType
- * @param key
- * @param value
*/
private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
Object key, Object value) {
@@ -1328,7 +1278,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
Object key, Object value, byte[] delta, EventID eventId) {
- // String[] cqs = new String[numCqs/2];
HashMap cqs = new HashMap();
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1496,7 +1445,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
}
}
-
private void handleTombstoneOperation(Message msg) {
String regionName = "unknown";
try { // not sure why this isn't done by the caller
@@ -1750,10 +1698,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// originating from the client
// and by updating the last update stat, the ServerMonitor is less
// likely to send pings...
- // and the ClientHealthMonitor will cause a disconnect -- mthomas
- // 10/18/2006
-
- // this._endpoint.setLastUpdate();
+ // and the ClientHealthMonitor will cause a disconnect
} catch (InterruptedIOException e) {
// Per Sun's support web site, this exception seems to be peculiar
@@ -1868,13 +1813,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
return socket.getLocalPort();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener#onDisconnect
- * (org.apache.geode.distributed.internal.InternalDistributedSystem)
- */
public void onDisconnect(InternalDistributedSystem sys) {
stopUpdater();
}
@@ -1884,15 +1822,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
*/
private volatile boolean endPointDied = false;
- /**
- * Returns true if the end point represented by this updater is considered dead.
- *
- * @return true if {@link #endpoint} died.
- */
- public boolean isEndPointDead() {
- return this.endPointDied;
- }
-
private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
if (actualBufferSize < requestedBufferSize) {
logger.info(LocalizedMessage.create(
@@ -1973,11 +1902,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
public long startTime() {
return DistributionStats.getStatTime();
}
-
}
public boolean isProcessing() {
- // TODO Auto-generated method stub
return continueProcessing.get();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e21a834..e0b5ab8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -12,35 +12,44 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CacheClientStatus;
+import org.apache.geode.internal.cache.IncomingGatewayStatus;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.logging.log4j.Logger;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of
* clients by looking at their heartbeats. If too much time elapses between heartbeats, the monitor
* determines that the client is dead and interrupts its threads.
- *
- *
+ *
* @since GemFire 4.2.3
*/
public class ClientHealthMonitor {
@@ -69,7 +78,7 @@ public class ClientHealthMonitor {
/**
* THe GemFire <code>Cache</code>
*/
- final protected Cache _cache;
+ private final InternalCache _cache;
/**
* A thread that validates client connections
@@ -123,7 +132,7 @@ public class ClientHealthMonitor {
* client has died and interrupting its sockets.
* @return The singleton <code>ClientHealthMonitor</code> instance
*/
- public static ClientHealthMonitor getInstance(Cache cache, int maximumTimeBetweenPings,
+ public static ClientHealthMonitor getInstance(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
createInstance(cache, maximumTimeBetweenPings, stats);
return _instance;
@@ -305,7 +314,7 @@ public class ClientHealthMonitor {
scheduledToBeRemovedTx.removeAll(txids);
}
};
- ((GemFireCacheImpl) this._cache).getCCPTimer().schedule(task, timeout);
+ this._cache.getCCPTimer().schedule(task, timeout);
}
}
}
@@ -384,55 +393,6 @@ public class ClientHealthMonitor {
}
}
- // /**
- // * Returns modifiable map (changes do not effect this class) of memberId
- // * to connection count.
- // */
- // public Map getConnectedClients() {
- // Map map = new HashMap(); // KEY=memberId, VALUE=connectionCount (Integer)
- // synchronized (_clientThreadsLock) {
- // Iterator connectedClients = this._clientThreads.entrySet().iterator();
- // while (connectedClients.hasNext()) {
- // Map.Entry entry = (Map.Entry) connectedClients.next();
- // String memberId = (String) entry.getKey();// memberId includes FQDN
- // Set connections = (Set) entry.getValue();
- // int socketPort = 0;
- // InetAddress socketAddress = null;
- // ///*
- // Iterator serverConnections = connections.iterator();
- // // Get data from one.
- // while (serverConnections.hasNext()) {
- // ServerConnection sc = (ServerConnection) serverConnections.next();
- // socketPort = sc.getSocketPort();
- // socketAddress = sc.getSocketAddress();
- // break;
- // }
- // //*/
- // int connectionCount = connections.size();
- // String clientString = null;
- // if (socketAddress == null) {
- // clientString = "client member id=" + memberId;
- // } else {
- // clientString = "host name=" + socketAddress.toString() + " host ip=" +
- // socketAddress.getHostAddress() + " client port=" + socketPort + " client
- // member id=" + memberId;
- // }
- // map.put(memberId, new Object[] {clientString, new
- // Integer(connectionCount)});
- // /* Note: all client addresses are same...
- // Iterator serverThreads = ((Set) entry.getValue()).iterator();
- // while (serverThreads.hasNext()) {
- // ServerConnection connection = (ServerConnection) serverThreads.next();
- // InetAddress clientAddress = connection.getClientAddress();
- // logger.severe("getConnectedClients: memberId=" + memberId +
- // " clientAddress=" + clientAddress + " FQDN=" +
- // clientAddress.getCanonicalHostName());
- // }*/
- // }
- // }
- // return map;
- // }
-
/**
* Returns modifiable map (changes do not effect this class) of client membershipID to connection
* count. This is different from the map contained in this class as here the key is client
@@ -442,7 +402,6 @@ public class ClientHealthMonitor {
* @param filterProxies Set identifying the Connection proxies which should be fetched. These
* ConnectionProxies may be from same client member or different. If it is null this would
* mean to fetch the Connections of all the ConnectionProxy objects.
- *
*/
public Map getConnectedClients(Set filterProxies) {
Map map = new HashMap(); // KEY=proxyID, VALUE=connectionCount (Integer)
@@ -677,7 +636,6 @@ public class ClientHealthMonitor {
return this._clientHeartbeats;
}
-
/**
* Shuts down the singleton <code>CacheClientNotifier</code> instance.
*/
@@ -693,10 +651,9 @@ public class ClientHealthMonitor {
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
- * client has died and interrupting its sockets.
*/
- protected static synchronized void createInstance(Cache cache, int maximumTimeBetweenPings,
- CacheClientNotifierStats stats) {
+ protected static synchronized void createInstance(InternalCache cache,
+ int maximumTimeBetweenPings, CacheClientNotifierStats stats) {
refCount++;
if (_instance != null) {
return;
@@ -710,9 +667,8 @@ public class ClientHealthMonitor {
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
- * client has died and interrupting its sockets.
*/
- private ClientHealthMonitor(Cache cache, int maximumTimeBetweenPings,
+ private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
// Set the Cache
this._cache = cache;
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ecd9c7a..6eadee3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.*;
@@ -49,7 +48,7 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.ClientHandShake;
@@ -373,7 +372,7 @@ public class ServerConnection implements Runnable {
return getCache().getDistributedSystem();
}
- public Cache getCache() {
+ public InternalCache getCache() {
return this.crHelper.getCache();
}
@@ -578,7 +577,7 @@ public class ServerConnection implements Runnable {
private boolean isFiringMembershipEvents() {
return this.acceptor.isRunning()
- && !((GemFireCacheImpl) this.acceptor.getCachedRegionHelper().getCache()).isClosed()
+ && !(this.acceptor.getCachedRegionHelper().getCache()).isClosed()
&& !acceptor.getCachedRegionHelper().getCache().getCancelCriterion().isCancelInProgress();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
index 5a02525..1b599e9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
@@ -12,16 +12,13 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -30,7 +27,6 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.TypeRegistry;
-
public class AddPdxEnum extends BaseCommand {
private static final Logger logger = LogService.getLogger();
@@ -56,7 +52,7 @@ public class AddPdxEnum extends BaseCommand {
int enumId = msg.getPart(1).getInt();
try {
- GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache();
+ InternalCache cache = servConn.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteEnum(enumId, enumInfo);
} catch (Exception e) {