You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/12 22:18:33 UTC
[30/51] [abbrv] [partial] geode git commit: GEODE-2632: change
dependencies on GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
index dacf8f5..9d85008 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
@@ -106,7 +106,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
if (this.txUniqId == TXManagerImpl.NOTX) {
return null;
} else {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return null;
@@ -116,9 +116,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
}
}
- private void cleanupTransasction(TXStateProxy tx) {
+ private void cleanupTransaction(TXStateProxy tx) {
if (this.txUniqId != TXManagerImpl.NOTX) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return;
@@ -130,7 +130,6 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
@Override
protected void process(final DistributionManager dm) {
-
Throwable thr = null;
boolean sendReply = true;
DistributedRegion dr = null;
@@ -202,7 +201,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
logger.trace(LogMarker.DM, "Exception caught while processing message", t);
}
} finally {
- cleanupTransasction(tx);
+ cleanupTransaction(tx);
if (sendReply && this.processorId != 0) {
ReplyException rex = null;
if (thr != null) {
@@ -275,9 +274,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- return (cache == null || cache.getCancelCriterion().isCancelInProgress());
+ private boolean checkCacheClosing(DistributionManager dm) {
+ InternalCache cache = GemFireCacheImpl.getInstance();
+ return cache == null || cache.getCancelCriterion().isCancelInProgress();
}
/**
@@ -285,7 +284,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
*
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ private boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
index 130e2a8..81bb7fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
@@ -28,15 +28,17 @@ public class DynamicRegionFactoryImpl extends DynamicRegionFactory {
* create an instance of the factory. This is normally only done by DynamicRegionFactory's static
* initialization
*/
- public DynamicRegionFactoryImpl() {}
+ public DynamicRegionFactoryImpl() {
+ // nothing
+ }
/** close the factory. Only do this if you're closing the cache, too */
public void close() {
- _close();
+ doClose();
}
/** initialize the factory for use with a new cache */
- public void internalInit(GemFireCacheImpl c) throws CacheException {
- _internalInit(c);
+ void internalInit(InternalCache cache) throws CacheException {
+ doInternalInit(cache);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 02c0422..ac4954a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -12,11 +12,33 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.function.Function;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CopyHelper;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.DeltaSerializationException;
+import org.apache.geode.GemFireIOException;
+import org.apache.geode.InvalidDeltaException;
+import org.apache.geode.SerializationException;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.SerializedCacheValue;
+import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.query.IndexMaintenanceException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
@@ -28,7 +50,14 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.*;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.ByteArrayDataInput;
+import org.apache.geode.internal.DSFIDFactory;
+import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.Sendable;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.lru.Sizeable;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
@@ -43,46 +72,46 @@ import org.apache.geode.internal.lang.StringUtils;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.geode.internal.offheap.*;
+import org.apache.geode.internal.offheap.OffHeapHelper;
+import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
+import org.apache.geode.internal.offheap.ReferenceCountHelper;
+import org.apache.geode.internal.offheap.Releasable;
+import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.util.ArrayUtils;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.logging.log4j.Logger;
-
-import java.io.*;
-import java.util.function.Function;
-
-import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
-import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
/**
* Implementation of an entry event
+ *
+ * must be public for DataSerializableFixedID
*/
-// must be public for DataSerializableFixedID
public class EntryEventImpl
implements EntryEvent, InternalCacheEvent, DataSerializableFixedID, EntryOperation, Releasable {
private static final Logger logger = LogService.getLogger();
// PACKAGE FIELDS //
public transient LocalRegion region;
+
private transient RegionEntry re;
protected KeyInfo keyInfo;
- // private long eventId;
/** the event's id. Scoped by distributedMember. */
protected EventID eventID;
private Object newValue = null;
+
/**
* If we ever serialize the new value then it should be stored in this field in case we need the
* serialized form again later. This was added to fix bug 43781. Note that we also have the
* "newValueBytes" field. But it is only non-null if setSerializedNewValue was called.
*/
private byte[] cachedSerializedNewValue = null;
+
@Retained(ENTRY_EVENT_OLD_VALUE)
private Object oldValue = null;
@@ -116,15 +145,11 @@ public class EntryEventImpl
*/
protected DistributedMember distributedMember;
-
/**
* transient storage for the message that caused the event
*/
transient DistributionMessage causedByMessage;
-
- // private static long eventID = 0;
-
/**
* The originating membershipId of this event.
*
@@ -138,12 +163,12 @@ public class EntryEventImpl
*/
private byte[] deltaBytes = null;
-
/** routing information for cache clients for this event */
private FilterInfo filterInfo;
/** new value stored in serialized form */
protected byte[] newValueBytes;
+
/** old value stored in serialized form */
private byte[] oldValueBytes;
@@ -157,7 +182,9 @@ public class EntryEventImpl
public final static Object SUSPECT_TOKEN = new Object();
- public EntryEventImpl() {}
+ public EntryEventImpl() {
+ // do nothing
+ }
/**
* Reads the contents of this message from the given input.
@@ -229,7 +256,7 @@ public class EntryEventImpl
}
this.txId = this.region.getTXId();
- /**
+ /*
* this might set txId for events done from a thread that has a tx even though the op is non-tx.
* For example region ops.
*/
@@ -341,9 +368,8 @@ public class EntryEventImpl
@Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument,
boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks,
EventID eventID) {
- EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, newValue, callbackArgument,
- originRemote, distributedMember, generateCallbacks, eventID);
- return entryEvent;
+ return new EntryEventImpl(region, op, key, newValue, callbackArgument, originRemote,
+ distributedMember, generateCallbacks, eventID);
}
/**
@@ -356,9 +382,8 @@ public class EntryEventImpl
public static EntryEventImpl create(LocalRegion region, Operation op, Object key,
boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks,
boolean fromRILocalDestroy) {
- EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, originRemote, distributedMember,
- generateCallbacks, fromRILocalDestroy);
- return entryEvent;
+ return new EntryEventImpl(region, op, key, originRemote, distributedMember, generateCallbacks,
+ fromRILocalDestroy);
}
/**
@@ -374,9 +399,8 @@ public class EntryEventImpl
public static EntryEventImpl create(final LocalRegion region, Operation op, Object key,
@Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote,
DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) {
- EntryEventImpl entryEvent = new EntryEventImpl(region, op, key, newVal, callbackArgument,
- originRemote, distributedMember, generateCallbacks, initializeId);
- return entryEvent;
+ return new EntryEventImpl(region, op, key, newVal, callbackArgument, originRemote,
+ distributedMember, generateCallbacks, initializeId);
}
/**
@@ -915,7 +939,7 @@ public class EntryEventImpl
public final Object getOldValueAsOffHeapDeserializedOrRaw() {
Object result = basicGetOldValue();
if (mayHaveOffHeapReferences() && result instanceof StoredObject) {
- result = ((StoredObject) result).getDeserializedForReading();
+ result = ((CachedDeserializable) result).getDeserializedForReading();
}
return AbstractRegion.handleNotAvailable(result); // fixes 49499
}
@@ -1289,7 +1313,7 @@ public class EntryEventImpl
public final Object getNewValueAsOffHeapDeserializedOrRaw() {
Object result = getRawNewValue();
if (mayHaveOffHeapReferences() && result instanceof StoredObject) {
- result = ((StoredObject) result).getDeserializedForReading();
+ result = ((CachedDeserializable) result).getDeserializedForReading();
}
return AbstractRegion.handleNotAvailable(result); // fixes 49499
}
@@ -1462,8 +1486,6 @@ public class EntryEventImpl
* hasn't been set yet.
*
* @param oldValueForDelta Used by Delta Propagation feature
- *
- * @throws RegionClearedException
*/
void putExistingEntry(final LocalRegion owner, final RegionEntry reentry, boolean requireOldValue,
Object oldValueForDelta) throws RegionClearedException {
@@ -1524,8 +1546,6 @@ public class EntryEventImpl
/**
* Put a newValue into the given, write synced, new, region entry.
- *
- * @throws RegionClearedException
*/
void putNewEntry(final LocalRegion owner, final RegionEntry reentry)
throws RegionClearedException {
@@ -1791,7 +1811,7 @@ public class EntryEventImpl
OffHeapHelper.releaseWithNoTracking(v);
}
}
- } catch (EntryNotFoundException ex) {
+ } catch (EntryNotFoundException ignore) {
return false;
}
}
@@ -2012,7 +2032,7 @@ public class EntryEventImpl
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
}
- } catch (IllegalStateException ex) {
+ } catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
}
buf.append(";newValue=");
@@ -2020,7 +2040,7 @@ public class EntryEventImpl
synchronized (this.offHeapLock) {
ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
}
- } catch (IllegalStateException ex) {
+ } catch (IllegalStateException ignore) {
buf.append("OFFHEAP_VALUE_FREED");
}
buf.append(";callbackArg=");
@@ -2029,10 +2049,6 @@ public class EntryEventImpl
buf.append(isOriginRemote());
buf.append(";originMember=");
buf.append(getDistributedMember());
- // if (this.partitionMessage != null) {
- // buf.append("; partitionMessage=");
- // buf.append(this.partitionMessage);
- // }
if (this.isPossibleDuplicate()) {
buf.append(";posDup");
}
@@ -2054,11 +2070,8 @@ public class EntryEventImpl
buf.append(this.eventID);
}
if (this.deltaBytes != null) {
- buf.append(";[" + this.deltaBytes.length + " deltaBytes]");
+ buf.append(";[").append(this.deltaBytes.length).append(" deltaBytes]");
}
- // else {
- // buf.append(";[no deltaBytes]");
- // }
if (this.filterInfo != null) {
buf.append(";routing=");
buf.append(this.filterInfo);
@@ -2239,8 +2252,6 @@ public class EntryEventImpl
/**
* Sets the operation type.
- *
- * @param eventType
*/
public void setEventType(EnumListenerEvent eventType) {
this.eventType = eventType;
@@ -2416,8 +2427,6 @@ public class EntryEventImpl
/**
* This method sets the delta bytes used in Delta Propagation feature. <B>For internal delta, see
* setNewValue().</B>
- *
- * @param deltaBytes
*/
public void setDeltaBytes(byte[] deltaBytes) {
this.deltaBytes = deltaBytes;
@@ -2494,7 +2503,6 @@ public class EntryEventImpl
* this method joins together version tag timestamps and the "lastModified" timestamps generated
* and stored in entries. If a change does not already carry a lastModified timestamp
*
- * @param suggestedTime
* @return the timestamp to store in the entry
*/
public long getEventTime(long suggestedTime) {
@@ -2741,10 +2749,10 @@ public class EntryEventImpl
// System.identityHashCode(ov));
if (ReferenceCountHelper.trackReferenceCounts()) {
ReferenceCountHelper.setReferenceCountOwner(new OldValueOwner());
- ((StoredObject) ov).release();
+ ((Releasable) ov).release();
ReferenceCountHelper.setReferenceCountOwner(null);
} else {
- ((StoredObject) ov).release();
+ ((Releasable) ov).release();
}
}
OffHeapHelper.releaseAndTrackOwner(nv, this);
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 4498b36..87835ff 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -14,6 +14,21 @@
*/
package org.apache.geode.internal.cache;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.distributed.DistributedMember;
@@ -30,21 +45,13 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.Breadcrumbs;
-import org.apache.logging.log4j.Logger;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicLong;
/**
* This class uniquely identifies any Region Operation like create, update destroy etc. It is
* composed of three parts , namely :- 1) DistributedMembershipID 2) ThreadID 3) SequenceID This
* helps in sequencing the events belonging to a unique producer.
- *
- *
*/
-public final class EventID implements DataSerializableFixedID, Serializable, Externalizable {
+public class EventID implements DataSerializableFixedID, Serializable, Externalizable {
private static final Logger logger = LogService.getLogger();
/** turns on very verbose logging ove membership id bytes */
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
index 278367c..2c86aed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
@@ -12,11 +12,21 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.client.PoolFactory;
@@ -31,14 +41,6 @@ import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
/**
* EventTracker tracks the last sequence number for a particular memberID:threadID. It is used to
@@ -86,11 +88,10 @@ public class EventTracker {
*/
private volatile InternalDistributedMember initialImageProvider;
-
/**
* The cache associated with this tracker
*/
- GemFireCacheImpl cache;
+ InternalCache cache;
/**
* The name of this tracker
@@ -110,12 +111,12 @@ public class EventTracker {
/**
* Initialize the EventTracker's timer task. This is stored in the cache for tracking and shutdown
* purposes
- *
+ *
* @param cache the cache to schedule tasks with
*/
- public static ExpiryTask startTrackerServices(GemFireCacheImpl cache) {
+ public static ExpiryTask startTrackerServices(InternalCache cache) {
long expiryTime = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "messageTrackingTimeout",
- PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3).longValue();
+ PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3);
ExpiryTask result = new ExpiryTask(cache, expiryTime);
cache.getCCPTimer().scheduleAtFixedRate(result, expiryTime, expiryTime);
// schedule(result, expiryTime);
@@ -124,10 +125,10 @@ public class EventTracker {
/**
* Terminate the tracker's timer task
- *
+ *
* @param cache the cache holding the tracker task
*/
- public static void stopTrackerServices(GemFireCacheImpl cache) {
+ public static void stopTrackerServices(InternalCache cache) {
cache.getEventTrackerTask().cancel();
}
@@ -507,8 +508,6 @@ public class EventTracker {
}
/**
- * @param event
- * @param eventID
* @return true if the event should not be tracked, false otherwise
*/
private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
@@ -675,17 +674,21 @@ public class EventTracker {
* Whether this object was removed by the cleanup thread.
*/
public boolean removed;
+
/**
* public for tests only
*/
public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, VersionTag>();
+
/** millisecond timestamp */
transient long endOfLifeTimer;
/**
* creates a new instance to save status of a putAllOperation
*/
- BulkOpHolder() {}
+ BulkOpHolder() {
+ // do nothing
+ }
public void putVersionTag(EventID eventId, VersionTag versionTag) {
entryVersionTags.put(eventId, versionTag);
@@ -699,13 +702,13 @@ public class EventTracker {
}
}
- static class ExpiryTask extends SystemTimerTask {
+ public static class ExpiryTask extends SystemTimerTask {
- GemFireCacheImpl cache;
+ InternalCache cache;
long expiryTime;
List trackers = new LinkedList();
- public ExpiryTask(GemFireCacheImpl cache, long expiryTime) {
+ public ExpiryTask(InternalCache cache, long expiryTime) {
this.cache = cache;
this.expiryTime = expiryTime;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java
index d3f5987..14edad9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpiryTask.java
@@ -12,17 +12,23 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
-/**
- * ExpiryTask represents a timeout event for expiration
- */
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
@@ -32,13 +38,10 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.tcp.ConnectionTable;
-import org.apache.logging.log4j.Logger;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+/**
+ * ExpiryTask represents a timeout event for expiration
+ */
public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
private static final Logger logger = LogService.getLogger();
@@ -49,8 +52,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
static {
// default to inline expiry to fix bug 37115
- int nThreads =
- Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "EXPIRY_THREADS", 0).intValue();
+ int nThreads = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "EXPIRY_THREADS", 0);
if (nThreads > 0) {
ThreadFactory tf = new ThreadFactory() {
private int nextId = 0;
@@ -396,7 +398,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
}
protected boolean isCacheClosing() {
- return ((GemFireCacheImpl) getLocalRegion().getCache()).isClosed();
+ return getLocalRegion().getCache().isClosed();
}
/**
@@ -464,7 +466,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
}
private static long calculateNow() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
// Use cache.cacheTimeMillis here. See bug 52267.
InternalDistributedSystem ids = cache.getInternalDistributedSystem();
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index e7175f3..9a4eca3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -47,6 +47,7 @@ import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.CqServiceProvider;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
@@ -88,7 +89,7 @@ public class FilterProfile implements DataSerializableFixedID {
private static final Logger logger = LogService.getLogger();
/** enumeration of distributed profile operations */
- static enum operationType {
+ enum operationType {
REGISTER_KEY,
REGISTER_KEYS,
REGISTER_PATTERN,
@@ -201,8 +202,6 @@ public class FilterProfile implements DataSerializableFixedID {
* used for instantiation of a profile associated with a region and not describing region filters
* in a different process. Do not use this method when instantiating profiles to store in
* distribution advisor profiles.
- *
- * @param r
*/
public FilterProfile(LocalRegion r) {
this.region = r;
@@ -499,7 +498,6 @@ public class FilterProfile implements DataSerializableFixedID {
/**
* Registers interest in a set of keys for a client
*
- * @param inputClientID
* @param keys The list of keys in which to register interest
* @param updatesAsInvalidates whether to send invalidations instead of updates
* @return the registered keys
@@ -796,7 +794,7 @@ public class FilterProfile implements DataSerializableFixedID {
public void stopCq(ServerCQ cq) {
ensureCqID(cq);
if (logger.isDebugEnabled()) {
- this.logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
+ logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
}
this.sendCQProfileOperation(operationType.STOP_CQ, cq);
}
@@ -919,7 +917,7 @@ public class FilterProfile implements DataSerializableFixedID {
if (clientId.equals(client)) {
try {
cq.close(false);
- } catch (Exception ex) {
+ } catch (Exception ignore) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to remove CQ from the base region. CqName : {}", cq.getName());
}
@@ -970,7 +968,7 @@ public class FilterProfile implements DataSerializableFixedID {
}
OperationMessage msg = new OperationMessage();
msg.regionName = this.region.getFullPath();
- msg.clientID = clientID.longValue();
+ msg.clientID = clientID;
msg.opType = opType;
msg.interest = interest;
msg.updatesAsInvalidates = updatesAsInvalidates;
@@ -980,14 +978,14 @@ public class FilterProfile implements DataSerializableFixedID {
private void sendFilterProfileOperation(OperationMessage msg) {
Set recipients =
- ((CacheDistributionAdvisee) this.region).getDistributionAdvisor().adviseProfileUpdate();
+ ((DistributionAdvisee) this.region).getDistributionAdvisor().adviseProfileUpdate();
msg.setRecipients(recipients);
ReplyProcessor21 rp = new ReplyProcessor21(this.region.getDistributionManager(), recipients);
msg.processorId = rp.getProcessorId();
this.region.getDistributionManager().putOutgoing(msg);
try {
rp.waitForReplies();
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -1014,9 +1012,10 @@ public class FilterProfile implements DataSerializableFixedID {
}
static final Profile[] NO_PROFILES = new Profile[0];
+
private final CacheProfile localProfile = new CacheProfile(this);
- private final Profile[] localProfileArray = new Profile[] {localProfile};
+ private final Profile[] localProfileArray = new Profile[] {localProfile};
/** compute local routing information */
public FilterInfo getLocalFilterRouting(CacheEvent event) {
@@ -1061,7 +1060,7 @@ public class FilterProfile implements DataSerializableFixedID {
// bug #50809 - local routing for transactional ops must be done here
// because the event isn't available later and we lose the old value for the entry
final boolean processLocalProfile =
- event.getOperation().isEntry() && ((EntryEventImpl) event).getTransactionId() != null;
+ event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
}
@@ -1106,7 +1105,7 @@ public class FilterProfile implements DataSerializableFixedID {
}
fillInCQRoutingInfo(event, true, NO_PROFILES, result);
}
- result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.EMPTY_SET);
+ result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.emptySet());
}
return result;
}
@@ -1275,8 +1274,8 @@ public class FilterProfile implements DataSerializableFixedID {
public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles,
FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) {
- Set clientsInv = Collections.EMPTY_SET;
- Set clients = Collections.EMPTY_SET;
+ Set clientsInv = Collections.emptySet();
+ Set clients = Collections.emptySet();
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
logger.trace(LogMarker.BRIDGE_SERVER, "finding interested clients for {}", event);
@@ -1310,8 +1309,8 @@ public class FilterProfile implements DataSerializableFixedID {
if (!pf.isLocalProfile() && cacheOpRecipients.contains(cf.getDistributedMember())) {
if (frInfo == null)
frInfo = new FilterRoutingInfo();
- frInfo.addInterestedClients(cf.getDistributedMember(), Collections.EMPTY_SET,
- Collections.EMPTY_SET, false);
+ frInfo.addInterestedClients(cf.getDistributedMember(), Collections.emptySet(),
+ Collections.emptySet(), false);
}
continue;
}
@@ -1567,11 +1566,10 @@ public class FilterProfile implements DataSerializableFixedID {
}
private Set<Object> getAllKeyClients() {
- Set allKeysRef = this.allKeyClients;
if (testHook != null) {
testHook.await();
}
- return allKeysRef;
+ return (Set) this.allKeyClients;
}
public int getAllKeyClientsSize() {
@@ -1627,7 +1625,7 @@ public class FilterProfile implements DataSerializableFixedID {
return "clients[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
- StringBuffer result = new StringBuffer(sorted.size() * 70);
+ StringBuilder result = new StringBuilder(sorted.size() * 70);
result.append("clients[");
Iterator<Long> it = sorted.iterator();
for (int i = 1; it.hasNext(); i++) {
@@ -1652,7 +1650,7 @@ public class FilterProfile implements DataSerializableFixedID {
return "cqs[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
- StringBuffer result = new StringBuffer(sorted.size() * 70);
+ StringBuilder result = new StringBuilder(sorted.size() * 70);
result.append("cqs[");
Iterator<Long> it = sorted.iterator();
for (int i = 1; it.hasNext(); i++) {
@@ -1666,7 +1664,6 @@ public class FilterProfile implements DataSerializableFixedID {
return result.toString();
}
-
/**
* given a collection of on-wire identifiers, this returns a set of the client/server identifiers
* for each client or durable queue
@@ -1730,7 +1727,7 @@ public class FilterProfile implements DataSerializableFixedID {
return new LinkedList(this.filterProfileMsgQueue.get(member));
}
}
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
/**
@@ -1746,14 +1743,11 @@ public class FilterProfile implements DataSerializableFixedID {
return new LinkedList(this.filterProfileMsgQueue.remove(member));
}
}
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
/**
* Adds the message to filter profile queue.
- *
- * @param member
- * @param message
*/
public void addToFilterProfileQueue(InternalDistributedMember member, OperationMessage message) {
if (logger.isDebugEnabled()) {
@@ -1771,8 +1765,6 @@ public class FilterProfile implements DataSerializableFixedID {
/**
* Process the filter profile messages.
- *
- * @param msgs
*/
public void processQueuedFilterProfileMsgs(List msgs) {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1840,7 +1832,7 @@ public class FilterProfile implements DataSerializableFixedID {
"No cache profile to update, adding filter profile message to queue. Message :{}",
this);
}
- FilterProfile localFP = ((PartitionedRegion) r).getFilterProfile();
+ FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
localFP.addToFilterProfileQueue(getSender(), this);
dm.getCancelCriterion().checkCancelInProgress(null);
} else {
@@ -1865,7 +1857,7 @@ public class FilterProfile implements DataSerializableFixedID {
reply.setRecipient(getSender());
try {
dm.putOutgoing(reply);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// can't send a reply, so ignore the exception
}
}
@@ -1927,7 +1919,7 @@ public class FilterProfile implements DataSerializableFixedID {
private CacheDistributionAdvisee findRegion() {
CacheDistributionAdvisee result = null;
- GemFireCacheImpl cache = null;
+ InternalCache cache;
try {
cache = GemFireCacheImpl.getInstance();
if (cache != null) {
@@ -1936,7 +1928,7 @@ public class FilterProfile implements DataSerializableFixedID {
result = (CacheDistributionAdvisee) lr;
}
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// nothing to do
}
return result;
@@ -2023,7 +2015,7 @@ public class FilterProfile implements DataSerializableFixedID {
if (nextID == Integer.MAX_VALUE) {
this.hasLongID = true;
}
- result = Long.valueOf(nextID++);
+ result = nextID++;
this.realIDs.put(realId, result);
this.wireIDs.put(result, realId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 279a4d1..0dd24f6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -14,18 +14,6 @@
*/
package org.apache.geode.internal.cache;
-import org.apache.geode.DataSerializable;
-import org.apache.geode.DataSerializer;
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.VersionedDataInputStream;
-import org.apache.geode.internal.VersionedDataSerializable;
-
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -38,6 +26,18 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.ObjToByteArraySerializer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
+
/**
* This class is used to hold the information about the servers and their Filters (CQs and Interest
* List) that are satisfied by the cache update operation.
@@ -217,7 +217,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
/** DataSerializable methods */
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
DistributedMember myID = null;
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
myID = cache.getMyId();
}
@@ -250,7 +250,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
public void fromDataPre_GFE_7_1_0_0(DataInput in) throws IOException, ClassNotFoundException {
DistributedMember myID = null;
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
myID = cache.getMyId();
}
@@ -303,7 +303,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
public boolean longIDs;
- public static final long serialVersionUID = 0;
+ private static final long serialVersionUID = 0;
/** Map holding Cq filterID and CqEvent Type */
private HashMap<Long, Integer> cqs;
@@ -383,21 +383,20 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
Map.Entry e = (Map.Entry) it.next();
// most cq IDs and all event types are small ints, so we use an optimized
// write that serializes 7 bits at a time in a compact form
- InternalDataSerializer.writeUnsignedVL(((Long) e.getKey()).longValue(), hdos);
- InternalDataSerializer.writeUnsignedVL(((Integer) e.getValue()).intValue(), hdos);
+ InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos);
+ InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos);
}
}
InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
if (out instanceof HeapDataOutputStream) {
- ((HeapDataOutputStream) out).writeAsSerializedByteArray(hdos);
+ ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos);
} else {
byte[] myData = hdos.toByteArray();
DataSerializer.writeByteArray(myData, out);
}
}
-
public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException {
if (OLD_MEMBERS_OPTIMIZED) {
this.myDataVersion = InternalDataSerializer.getVersionForDataStreamOrNull(in);
@@ -422,14 +421,14 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
Map.Entry e = (Map.Entry) it.next();
// most cq IDs and all event types are small ints, so we use an optimized
// write that serializes 7 bits at a time in a compact form
- InternalDataSerializer.writeUnsignedVL(((Long) e.getKey()).longValue(), hdos);
- InternalDataSerializer.writeUnsignedVL(((Integer) e.getValue()).intValue(), hdos);
+ InternalDataSerializer.writeUnsignedVL((Long) e.getKey(), hdos);
+ InternalDataSerializer.writeUnsignedVL((Integer) e.getValue(), hdos);
}
}
InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
if (out instanceof HeapDataOutputStream) {
- ((HeapDataOutputStream) out).writeAsSerializedByteArray(hdos);
+ ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos);
} else {
byte[] myData = hdos.toByteArray();
DataSerializer.writeByteArray(myData, out);
@@ -494,7 +493,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
this.cqs = new HashMap(numEntries);
for (int i = 0; i < numEntries; i++) {
Long key = InternalDataSerializer.readUnsignedVL(dis);
- Integer value = Integer.valueOf((int) InternalDataSerializer.readUnsignedVL(dis));
+ Integer value = (int) InternalDataSerializer.readUnsignedVL(dis);
this.cqs.put(key, value);
}
}
@@ -506,10 +505,9 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
}
}
-
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
if (this.interestedClients != null && this.interestedClients.size() > 0) {
sb.append("interestedClients:");
sb.append(this.interestedClients);
@@ -533,4 +531,3 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
}
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
index 1145687..71423e3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -27,7 +26,6 @@ import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
@@ -50,14 +48,11 @@ import org.apache.geode.internal.logging.LogService;
public class FindDurableQueueProcessor extends ReplyProcessor21 {
private static final Logger logger = LogService.getLogger();
- ////////// Public static entry point /////////
-
final ArrayList durableLocations = new ArrayList();
- // @todo gregp: add javadocs
public static ArrayList sendAndFind(ServerLocator locator, ClientProxyMembershipID proxyId,
DM dm) {
- Set members = ((ControllerAdvisor) locator.getDistributionAdvisor()).adviseBridgeServers();
+ Set members = ((GridAdvisor) locator.getDistributionAdvisor()).adviseBridgeServers();
if (members.contains(dm.getId())) {
// Don't send message to local server, see #50534.
Set remoteMembers = new HashSet(members);
@@ -80,9 +75,9 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
private static void findLocalDurableQueues(ClientProxyMembershipID proxyId,
ArrayList<ServerLocation> matches) {
- Cache c = GemFireCacheImpl.getInstance();
- if (c != null) {
- List l = c.getCacheServers();
+ InternalCache cache = GemFireCacheImpl.getInstance();
+ if (cache != null) {
+ List l = cache.getCacheServers();
if (l != null) {
Iterator i = l.iterator();
while (i.hasNext()) {
@@ -96,9 +91,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
}
}
-
- //////////// Instance methods //////////////
-
@Override
public void process(DistributionMessage msg) {
// TODO Auto-generated method stub
@@ -112,7 +104,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
super.process(msg);
}
-
/**
* Creates a new instance of FindDurableQueueProcessor
*/
@@ -120,9 +111,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
super(dm, members);
}
-
- /////////////// Inner message classes //////////////////
-
public static class FindDurableQueueMessage extends PooledDistributionMessage
implements MessageWithReply {
private int processorId;
@@ -149,14 +137,12 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
return this.proxyId;
}
-
@Override
protected void process(final DistributionManager dm) {
ArrayList<ServerLocation> matches = new ArrayList<ServerLocation>();
try {
findLocalDurableQueues(proxyId, matches);
-
} finally {
FindDurableQueueReply reply = new FindDurableQueueReply();
reply.setProcessorId(this.getProcessorId());
@@ -169,7 +155,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
dm.putOutgoing(reply);
}
}
-
}
public int getDSFID() {
@@ -205,7 +190,6 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
}
}
-
public static class FindDurableQueueReply extends ReplyMessage {
protected ArrayList matches = null;
@@ -239,4 +223,3 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
}
}
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
index 84e3009..3b89cfc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
@@ -50,9 +50,12 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
private static final Logger logger = LogService.getLogger();
private TXId txId;
+
private int processorId;
- public FindRemoteTXMessage() {}
+ public FindRemoteTXMessage() {
+ // do nothing
+ }
public FindRemoteTXMessage(TXId txid, int processorId, Set recipients) {
super();
@@ -93,8 +96,7 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
logger.debug("processing {}", this);
}
FindRemoteTXMessageReply reply = new FindRemoteTXMessageReply();
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();// .getExisting("Looking up
- // CacheTransactionManager");
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager();
mgr.waitForCompletingTransaction(txId); // in case there is a lost commit going on
@@ -147,10 +149,8 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String className = getClass().getName();
- // className.substring(className.lastIndexOf('.', className.lastIndexOf('.') - 1) + 1); //
- // partition.<foo> more generic version
buff.append(className.substring(
className.indexOf(PartitionMessage.PN_TOKEN) + PartitionMessage.PN_TOKEN.length())); // partition.<foo>
buff.append("(txId=").append(this.txId).append("; sender=").append(getSender())
@@ -173,7 +173,6 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
this.processorId = in.readInt();
}
-
public static class FindRemoteTXMessageReplyProcessor extends ReplyProcessor21 {
private InternalDistributedMember hostingMember;
@@ -235,8 +234,6 @@ public class FindRemoteTXMessage extends HighPriorityDistributionMessage
return true;
}
-
-
/**
* Reply message for {@link FindRemoteTXMessage}. Reply is a boolean to indicate if the recipient
* hosts or has recently hosted the tx state. If the member did host the txState previously, reply
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
index 544a27e..199aafc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FindVersionTagOperation.java
@@ -12,9 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- *
- */
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -38,9 +35,6 @@ import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.LogService;
-/**
- *
- */
public class FindVersionTagOperation {
private static final Logger logger = LogService.getLogger();
@@ -94,7 +88,6 @@ public class FindVersionTagOperation {
public boolean stillWaiting() {
return this.versionTag == null && super.stillWaiting();
}
-
}
/**
@@ -124,12 +117,6 @@ public class FindVersionTagOperation {
/** for deserialization */
public FindVersionTagMessage() {}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.distributed.internal.DistributionMessage#process(org.apache.geode.
- * distributed.internal.DistributionManager)
- */
@Override
protected void process(DistributionManager dm) {
VersionTag result = null;
@@ -169,7 +156,7 @@ public class FindVersionTagOperation {
}
private LocalRegion findRegion() {
- GemFireCacheImpl cache = null;
+ InternalCache cache;
try {
cache = GemFireCacheImpl.getInstance();
if (cache != null) {
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 84e734c..4ed583a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -119,7 +119,6 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.Pool;
@@ -127,6 +126,8 @@ import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.client.internal.ClientRegionFactoryImpl;
+import org.apache.geode.cache.client.internal.ConnectionImpl;
+import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.cache.execute.FunctionService;
@@ -239,8 +240,8 @@ import org.apache.geode.redis.GeodeRedisServer;
* GemFire's implementation of a distributed {@link Cache}.
*/
@SuppressWarnings("deprecation")
-public class GemFireCacheImpl
- implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
+public class GemFireCacheImpl implements InternalCache, InternalClientCache, HasCachePerfStats,
+ DistributionAdvisee, CacheTime {
private static final Logger logger = LogService.getLogger();
/** The default number of seconds to wait for a distributed lock */
@@ -287,6 +288,8 @@ public class GemFireCacheImpl
* (the default) then the size of the entry value is unchanged by a delta application. Not a final
* so that tests can change this value.
*
+ * TODO: move or static or encapsulate with interface methods
+ *
* @since GemFire h****** 6.1.2.9
*/
static boolean DELTAS_RECALCULATE_SIZE =
@@ -580,10 +583,6 @@ public class GemFireCacheImpl
private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<>();
- public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
-
- private static int clientFunctionTimeout;
-
private final SecurityService securityService = SecurityService.getSecurityService();
static {
@@ -928,10 +927,12 @@ public class GemFireCacheImpl
} // synchronized
}
+ @Override
public boolean isRESTServiceRunning() {
return this.isRESTServiceRunning;
}
+ @Override
public void setRESTServiceRunning(boolean isRESTServiceRunning) {
this.isRESTServiceRunning = isRESTServiceRunning;
}
@@ -941,6 +942,7 @@ public class GemFireCacheImpl
*
* @return RestAgent
*/
+ @Override
public RestAgent getRestAgent() {
return this.restAgent;
}
@@ -1097,6 +1099,7 @@ public class GemFireCacheImpl
*
* @return true if the cache has pools declared
*/
+ @Override
public boolean hasPool() {
return this.isClient || !getAllPools().isEmpty();
}
@@ -1213,10 +1216,6 @@ public class GemFireCacheImpl
startRestAgentServer(this);
- int time = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT",
- DEFAULT_CLIENT_FUNCTION_TIMEOUT);
- clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
-
this.isInitialized = true;
}
@@ -1928,6 +1927,7 @@ public class GemFireCacheImpl
close(false);
}
+ @Override
public void close(String reason, boolean keepAlive, boolean keepDS) {
close(reason, null, keepAlive, keepDS);
}
@@ -1937,6 +1937,7 @@ public class GemFireCacheImpl
close("Normal disconnect", null, keepAlive, false);
}
+ @Override
public void close(String reason, Throwable optionalCause) {
close(reason, optionalCause, false, false);
}
@@ -1974,6 +1975,7 @@ public class GemFireCacheImpl
*
* @return the GatewaySender distributed lock service
*/
+ @Override
public DistributedLockService getGatewaySenderLockService() {
if (this.gatewayLockService == null) {
synchronized (this.gatewayLockServiceLock) {
@@ -2492,22 +2494,25 @@ public class GemFireCacheImpl
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores =
new ConcurrentHashMap<>();
- void addDiskStore(DiskStoreImpl dsi) {
+ @Override
+ public void addDiskStore(DiskStoreImpl dsi) {
this.diskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
this.diskMonitor.addDiskStore(dsi);
}
}
- void removeDiskStore(DiskStoreImpl dsi) {
- this.diskStores.remove(dsi.getName());
- this.regionOwnedDiskStores.remove(dsi.getName());
+ @Override
+ public void removeDiskStore(DiskStoreImpl diskStore) {
+ this.diskStores.remove(diskStore.getName());
+ this.regionOwnedDiskStores.remove(diskStore.getName());
// Added for M&M
- if (!dsi.getOwnedByRegion())
- this.system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+ if (!diskStore.getOwnedByRegion())
+ this.system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore);
}
- void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
+ @Override
+ public void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
this.regionOwnedDiskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
this.diskMonitor.addDiskStore(dsi);
@@ -2544,6 +2549,7 @@ public class GemFireCacheImpl
return defaultDiskStoreName;
}
+ // TODO: remove static from defaultDiskStoreName and move methods to InternalCache
private static String defaultDiskStoreName = DiskStoreFactory.DEFAULT_DISK_STORE_NAME;
@Override
@@ -2579,13 +2585,13 @@ public class GemFireCacheImpl
* @since GemFire prPersistSprint2
*/
@Override
- public Collection<DiskStoreImpl> listDiskStores() {
+ public Collection<DiskStore> listDiskStores() {
return Collections.unmodifiableCollection(this.diskStores.values());
}
@Override
- public Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned() {
- Collection<DiskStoreImpl> allDiskStores = new HashSet<>();
+ public Collection<DiskStore> listDiskStoresIncludingRegionOwned() {
+ Collection<DiskStore> allDiskStores = new HashSet<>();
allDiskStores.addAll(this.diskStores.values());
allDiskStores.addAll(this.regionOwnedDiskStores.values());
return allDiskStores;
@@ -2749,10 +2755,12 @@ public class GemFireCacheImpl
return this.system.getSecurityInternalLogWriter();
}
+ @Override
public InternalLogWriter getInternalLogWriter() {
return this.system.getInternalLogWriter();
}
+ @Override
public InternalLogWriter getSecurityInternalLogWriter() {
return this.system.getSecurityInternalLogWriter();
}
@@ -2762,7 +2770,8 @@ public class GemFireCacheImpl
*
* @return the sweeper task
*/
- EventTracker.ExpiryTask getEventTrackerTask() {
+ @Override
+ public EventTracker.ExpiryTask getEventTrackerTask() {
return this.recordedEventSweeper;
}
@@ -2782,6 +2791,7 @@ public class GemFireCacheImpl
* @param className Class name of the declarable
* @return List of all instances of properties found for the given declarable
*/
+ @Override
public List<Properties> getDeclarableProperties(final String className) {
List<Properties> propertiesList = new ArrayList<>();
synchronized (this.declarablePropertiesMap) {
@@ -2800,6 +2810,7 @@ public class GemFireCacheImpl
* @param declarable The declarable
* @return Properties found for the given declarable
*/
+ @Override
public Properties getDeclarableProperties(final Declarable declarable) {
return this.declarablePropertiesMap.get(declarable);
}
@@ -2998,11 +3009,12 @@ public class GemFireCacheImpl
}
}
+ // TODO: createVMRegion method is too complex for IDE to analyze
@Override
public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs,
InternalRegionArguments internalRegionArgs)
throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException {
- // TODO: refactor overly complex method
+
if (getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
if (!internalRegionArgs.isUsedForMetaRegion()
&& internalRegionArgs.getInternalMetaRegion() == null) {
@@ -3199,6 +3211,7 @@ public class GemFireCacheImpl
return result;
}
+ @Override
public Set<LocalRegion> getApplicationRegions() {
Set<LocalRegion> result = new HashSet<>();
synchronized (this.rootRegions) {
@@ -3255,6 +3268,7 @@ public class GemFireCacheImpl
return (LocalRegion) this.pathToRegion.get(path);
}
+ @Override
public LocalRegion getRegionByPathForProcessing(String path) {
LocalRegion result = getRegionByPath(path);
if (result == null) {
@@ -3324,7 +3338,8 @@ public class GemFireCacheImpl
}
/** Return true if this region is initializing */
- boolean isGlobalRegionInitializing(String fullPath) {
+ @Override
+ public boolean isGlobalRegionInitializing(String fullPath) {
this.stopper.checkCancelInProgress(null);
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // go through
// initialization latches
@@ -3636,13 +3651,13 @@ public class GemFireCacheImpl
}
@Override
- public void addRegionListener(RegionListener l) {
- this.regionListeners.add(l);
+ public void addRegionListener(RegionListener regionListener) {
+ this.regionListeners.add(regionListener);
}
@Override
- public void removeRegionListener(RegionListener l) {
- this.regionListeners.remove(l);
+ public void removeRegionListener(RegionListener regionListener) {
+ this.regionListeners.remove(regionListener);
}
@Override
@@ -3763,6 +3778,7 @@ public class GemFireCacheImpl
return addCacheServer(false);
}
+ @Override
public CacheServer addCacheServer(boolean isGatewayReceiver) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3776,6 +3792,7 @@ public class GemFireCacheImpl
return cacheServer;
}
+ @Override
public void addGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3824,6 +3841,7 @@ public class GemFireCacheImpl
}
}
+ @Override
public void removeGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3844,6 +3862,7 @@ public class GemFireCacheImpl
}
}
+ @Override
public void addGatewayReceiver(GatewayReceiver receiver) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3859,6 +3878,7 @@ public class GemFireCacheImpl
}
}
+ @Override
public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
this.allAsyncEventQueues.add(asyncQueue);
if (!asyncQueue.isMetaQueue()) {
@@ -3923,6 +3943,7 @@ public class GemFireCacheImpl
return null;
}
+ @Override
public void removeAsyncEventQueue(AsyncEventQueue asyncQueue) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3975,6 +3996,7 @@ public class GemFireCacheImpl
return cacheServersWithoutReceiver;
}
+ @Override
public List getCacheServersAndGatewayReceiver() {
return this.allCacheServers;
}
@@ -3983,7 +4005,7 @@ public class GemFireCacheImpl
* add a partitioned region to the set of tracked partitioned regions. This is used to notify the
* regions when this cache requires, or does not require notification of all region/entry events.
*/
- void addPartitionedRegion(PartitionedRegion region) {
+ public void addPartitionedRegion(PartitionedRegion region) {
synchronized (this.partitionedRegions) {
if (region.isDestroyed()) {
if (logger.isDebugEnabled()) {
@@ -4121,7 +4143,8 @@ public class GemFireCacheImpl
*
* @see #addPartitionedRegion(PartitionedRegion)
*/
- void removePartitionedRegion(PartitionedRegion region) {
+ @Override
+ public void removePartitionedRegion(PartitionedRegion region) {
synchronized (this.partitionedRegions) {
if (this.partitionedRegions.remove(region)) {
getCachePerfStats().incPartitionedRegions(-1);
@@ -4383,6 +4406,7 @@ public class GemFireCacheImpl
/**
* Blocks until no register interests are in progress.
*/
+ @Override
public void waitForRegisterInterestsInProgress() {
// In *this* particular context, let the caller know that
// his cache has been cancelled. doWait below would do that as
@@ -4409,11 +4433,13 @@ public class GemFireCacheImpl
}
}
+ @Override
@SuppressWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
public void setQueryMonitorRequiredForResourceManager(boolean required) {
queryMonitorRequiredForResourceManager = required;
}
+ @Override
public boolean isQueryMonitorDisabledForLowMemory() {
return this.queryMonitorDisabledForLowMem;
}
@@ -4423,6 +4449,7 @@ public class GemFireCacheImpl
*
* @since GemFire 6.0
*/
+ @Override
public QueryMonitor getQueryMonitor() {
// Check to see if monitor is required if ResourceManager critical heap percentage is set
// or whether we override it with the system variable;
@@ -4932,7 +4959,8 @@ public class GemFireCacheImpl
this.regionsInDestroy.remove(path, region);
}
- DistributedRegion getRegionInDestroy(String path) {
+ @Override
+ public DistributedRegion getRegionInDestroy(String path) {
return this.regionsInDestroy.get(path);
}
@@ -5045,7 +5073,8 @@ public class GemFireCacheImpl
return this.serialNumber;
}
- TXEntryStateFactory getTXEntryStateFactory() {
+ @Override
+ public TXEntryStateFactory getTXEntryStateFactory() {
return this.txEntryStateFactory;
}
@@ -5067,6 +5096,7 @@ public class GemFireCacheImpl
}
// test hook
+ @Override
public void setReadSerialized(boolean value) {
this.cacheConfig.setPdxReadSerialized(value);
}
@@ -5140,6 +5170,7 @@ public class GemFireCacheImpl
return PdxInstanceFactoryImpl.newCreator(className, true);
}
+ @Override
public PdxInstanceFactory createPdxInstanceFactory(String className, boolean expectDomainClass) {
return PdxInstanceFactoryImpl.newCreator(className, expectDomainClass);
}
@@ -5170,7 +5201,8 @@ public class GemFireCacheImpl
return this.getSystem().getOffHeapStore();
}
- DiskStoreMonitor getDiskStoreMonitor() {
+ @Override
+ public DiskStoreMonitor getDiskStoreMonitor() {
return this.diskMonitor;
}
@@ -5183,10 +5215,6 @@ public class GemFireCacheImpl
return this.extensionPoint;
}
- public static int getClientFunctionTimeout() {
- return clientFunctionTimeout;
- }
-
@Override
public CqService getCqService() {
return this.cqService;
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java
index a19a958..03c14ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GridAdvisor.java
@@ -12,23 +12,26 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
-import java.util.*;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
-import org.apache.geode.*;
+import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.*;
-import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
-import org.apache.geode.distributed.internal.membership.*;
-
+import org.apache.geode.distributed.internal.DistributionAdvisee;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
/**
* Used to share code with BridgeServerAdvisor and ControllerAdvisor
- *
*/
public abstract class GridAdvisor extends DistributionAdvisor {
@@ -38,9 +41,13 @@ public abstract class GridAdvisor extends DistributionAdvisor {
}
private final Object cacheLock = new Object();
+
private volatile List/* <BridgeServerProfile> */ cachedBridgeServerProfiles;
+
private volatile List/* <ControllerProfile> */ cachedControllerProfiles;
+
private volatile Set/* <DistributedMember> */ cachedBridgeServerAdvise;
+
private volatile Set/* <DistributedMember> */ cachedControllerAdvise;
private static final Filter CONTROLLER_FILTER = new Filter() {
@@ -90,11 +97,12 @@ public abstract class GridAdvisor extends DistributionAdvisor {
}
/**
- * Returns an unmodifiable <code>List</code> of the <code>BridgeServerProfile</code>s for all
- * known bridge servers.
+ * Returns an unmodifiable {@code List} of the {@code BridgeServerProfile}s for all known bridge
+ * servers.
*/
public List/* <BridgeServerProfile> */ fetchBridgeServers() {
- List/* <BridgeServerProfile> */ result = null; // this.cachedBridgeServerProfiles;
+ List/* <BridgeServerProfile> */ result = null;
+ // TODO: remove double-checking
if (result == null) {
synchronized (this.cacheLock) {
// result = this.cachedBridgeServerProfiles;
@@ -108,8 +116,8 @@ public abstract class GridAdvisor extends DistributionAdvisor {
}
/**
- * Returns an unmodifiable <code>List</code> of the <code>ControllerProfile</code>s for all known
- * cnx controllers.
+ * Returns an unmodifiable {@code List} of the {@code ControllerProfile}s for all known cnx
+ * controllers.
*/
public List/* <ControllerProfile> */ fetchControllers() {
List/* <ControllerProfile> */ result = this.cachedControllerProfiles;
@@ -224,8 +232,6 @@ public abstract class GridAdvisor extends DistributionAdvisor {
profilesChanged();
}
-
-
@Override
public Set adviseProfileRemove() {
// Our set of profiles includes local members. However, the update
@@ -236,12 +242,10 @@ public abstract class GridAdvisor extends DistributionAdvisor {
return results;
}
-
-
/**
* Describes profile data common for all Grid resources
*/
- public static abstract class GridProfile extends DistributionAdvisor.Profile {
+ public abstract static class GridProfile extends DistributionAdvisor.Profile {
private String host;
@@ -323,7 +327,7 @@ public abstract class GridAdvisor extends DistributionAdvisor {
*/
protected final void tellLocalBridgeServers(boolean removeProfile, boolean exchangeProfiles,
final List<Profile> replyProfiles) {
- final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ final InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
List<?> bridgeServers = cache.getCacheServersAndGatewayReceiver();
for (int i = 0; i < bridgeServers.size(); i++) {
@@ -367,8 +371,8 @@ public abstract class GridAdvisor extends DistributionAdvisor {
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
- sb.append("; host=" + this.host);
- sb.append("; port=" + this.port);
+ sb.append("; host=").append(this.host);
+ sb.append("; port=").append(this.port);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 07dd62c..f80f971 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -89,16 +89,8 @@ public final class HARegion extends DistributedRegion {
private volatile HARegionQueue owningQueue;
- // private Map giiProviderStates;
-
- /**
- * @param regionName
- * @param attrs
- * @param parentRegion
- * @param cache
- */
private HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
- GemFireCacheImpl cache) {
+ InternalCache cache) {
super(regionName, attrs, parentRegion, cache,
new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null));
@@ -163,7 +155,7 @@ public final class HARegion extends DistributedRegion {
ExpirationAttributes oldAttrs = getEntryTimeToLive();
this.entryTimeToLive = timeToLive.getTimeout();
this.entryTimeToLiveExpirationAction = timeToLive.getAction();
- setEntryTimeToLiveAtts();
+ setEntryTimeToLiveAttributes();
updateEntryExpiryPossible();
timeToLiveChanged(oldAttrs);
return oldAttrs;
@@ -256,7 +248,7 @@ public final class HARegion extends DistributedRegion {
* @throws IOException
* @throws ClassNotFoundException
*/
- public static HARegion getInstance(String regionName, GemFireCacheImpl cache, HARegionQueue hrq,
+ public static HARegion getInstance(String regionName, InternalCache cache, HARegionQueue hrq,
RegionAttributes ra)
throws TimeoutException, RegionExistsException, IOException, ClassNotFoundException {
@@ -441,9 +433,9 @@ public final class HARegion extends DistributedRegion {
}
@Override
- public void fillInProfile(Profile p) {
- super.fillInProfile(p);
- HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile) p;
+ public void fillInProfile(Profile profile) {
+ super.fillInProfile(profile);
+ HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile) profile;
// dunit tests create HARegions without encapsulating them in queues
if (this.owningQueue != null) {
h.isPrimary = this.owningQueue.isPrimary();
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index d0ad5db..d6dc98f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -3451,7 +3451,7 @@ public class InitialImageOperation {
if (haReg == null || haReg.getName() == null) {
throw new ReplyException("HARegion for the proxy is Null.");
}
- GemFireCacheImpl cache = haReg.getCache();
+ InternalCache cache = haReg.getCache();
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
if (ccn == null || ccn.getHaContainer() == null) {
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 33a7f52..5533ed1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.File;
@@ -20,6 +19,8 @@ import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -27,26 +28,41 @@ import javax.transaction.TransactionManager;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.internal.QueryMonitor;
import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
-import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.CacheTime;
import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.ResourceAdvisor;
import org.apache.geode.internal.cache.extension.Extensible;
+import org.apache.geode.internal.cache.persistence.BackupManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.offheap.MemoryAllocator;
+import org.apache.geode.management.internal.JmxManagerAdvisor;
+import org.apache.geode.management.internal.RestAgent;
+import org.apache.geode.pdx.PdxInstanceFactory;
import org.apache.geode.pdx.internal.TypeRegistry;
/**
@@ -56,13 +72,13 @@ import org.apache.geode.pdx.internal.TypeRegistry;
* @see org.apache.geode.cache.Cache
* @since GemFire 7.0
*/
-public interface InternalCache extends Cache, Extensible<Cache> {
+public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
- DistributedMember getMyId();
+ InternalDistributedMember getMyId();
- Collection<DiskStoreImpl> listDiskStores();
+ Collection<DiskStore> listDiskStores();
- Collection<DiskStoreImpl> listDiskStoresIncludingRegionOwned();
+ Collection<DiskStore> listDiskStoresIncludingRegionOwned();
CqService getCqService();
@@ -173,11 +189,122 @@ public interface InternalCache extends Cache, Extensible<Cache> {
Set<PartitionedRegion> getPartitionedRegions();
- void addRegionListener(RegionListener l);
+ void addRegionListener(RegionListener regionListener);
- void removeRegionListener(RegionListener l);
+ void removeRegionListener(RegionListener regionListener);
Set<RegionListener> getRegionListeners();
CacheConfig getCacheConfig();
+
+ boolean getPdxReadSerializedByAnyGemFireServices();
+
+ BackupManager getBackupManager();
+
+ void setDeclarativeCacheConfig(CacheConfig cacheConfig);
+
+ void initializePdxRegistry();
+
+ void readyDynamicRegionFactory();
+
+ void setBackupFiles(List<File> backups);
+
+ void addDeclarableProperties(final Map<Declarable, Properties> mapOfNewDeclarableProps);
+
+ void setInitializer(Declarable initializer, Properties initializerProps);
+
+ boolean hasPool();
+
+ DiskStoreFactory createDiskStoreFactory(DiskStoreAttributes attrs);
+
+ void determineDefaultPool();
+
+ <K, V> Region<K, V> basicCreateRegion(String name, RegionAttributes<K, V> attrs)
+ throws RegionExistsException, TimeoutException;
+
+ BackupManager startBackup(InternalDistributedMember sender) throws IOException;
+
+ Throwable getDisconnectCause();
+
+ void addPartitionedRegion(PartitionedRegion region);
+
+ void removePartitionedRegion(PartitionedRegion region);
+
+ void addDiskStore(DiskStoreImpl dsi);
+
+ TXEntryStateFactory getTXEntryStateFactory();
+
+ EventTracker.ExpiryTask getEventTrackerTask();
+
+ void removeDiskStore(DiskStoreImpl diskStore);
+
+ void addGatewaySender(GatewaySender sender);
+
+ void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue);
+
+ void removeAsyncEventQueue(AsyncEventQueue asyncQueue);
+
+ QueryMonitor getQueryMonitor();
+
+ void close(String reason, Throwable systemFailureCause, boolean keepAlive, boolean keepDS);
+
+ JmxManagerAdvisor getJmxManagerAdvisor();
+
+ List<Properties> getDeclarableProperties(final String className);
+
+ int getUpTime();
+
+ Set<Region<?, ?>> rootRegions(boolean includePRAdminRegions);
+
+ Set<LocalRegion> getAllRegions();
+
+ DistributedRegion getRegionInDestroy(String path);
+
+ void addRegionOwnedDiskStore(DiskStoreImpl dsi);
+
+ DiskStoreMonitor getDiskStoreMonitor();
+
+ void close(String reason, Throwable optionalCause);
+
+ LocalRegion getRegionByPathForProcessing(String path);
+
+ List getCacheServersAndGatewayReceiver();
+
+ boolean isGlobalRegionInitializing(String fullPath);
+
+ DistributionAdvisor getDistributionAdvisor();
+
+ void setQueryMonitorRequiredForResourceManager(boolean required);
+
+ boolean isQueryMonitorDisabledForLowMemory();
+
+ boolean isRESTServiceRunning();
+
+ InternalLogWriter getInternalLogWriter();
+
+ InternalLogWriter getSecurityInternalLogWriter();
+
+ Set<LocalRegion> getApplicationRegions();
+
+ void removeGatewaySender(GatewaySender sender);
+
+ DistributedLockService getGatewaySenderLockService();
+
+ RestAgent getRestAgent();
+
+ Properties getDeclarableProperties(final Declarable declarable);
+
+ void setRESTServiceRunning(boolean isRESTServiceRunning);
+
+ void close(String reason, boolean keepAlive, boolean keepDS);
+
+ void addGatewayReceiver(GatewayReceiver receiver);
+
+ CacheServer addCacheServer(boolean isGatewayReceiver);
+
+ void setReadSerialized(boolean value);
+
+ PdxInstanceFactory createPdxInstanceFactory(String className, boolean expectDomainClass);
+
+ void waitForRegisterInterestsInProgress();
}