You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/11 21:07:11 UTC
[32/52] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java
index 33cfa09..3cc988f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java
@@ -140,11 +140,11 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
protected HashMap subregionSerialNumbers;
protected boolean notifyOfRegionDeparture;
+
/**
* true if need to automatically recreate region, and mark destruction as a reinitialization
*/
protected transient LocalRegion lockRoot = null; // used for early destroy
- // lock acquisition
@Override
protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException {
@@ -158,9 +158,8 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
}
protected RegionEventImpl createRegionEvent(DistributedRegion rgn) {
- RegionEventImpl event = new RegionEventImpl(rgn, getOperation(), this.callbackArg,
- true /* originRemote */, getSender());
- return event;
+ return new RegionEventImpl(rgn, getOperation(), this.callbackArg, true /* originRemote */,
+ getSender());
}
private Runnable destroyOp(final DistributionManager dm, final LocalRegion lclRgn,
@@ -183,12 +182,12 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
advisee =
PartitionedRegionHelper.getProxyBucketRegion(GemFireCacheImpl.getInstance(),
regionPath, waitForBucketInitializationToComplete);
- } catch (PRLocallyDestroyedException e) {
+ } catch (PRLocallyDestroyedException ignore) {
// region not found - it's been destroyed
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// ditto
} catch (PartitionedRegionException e) {
- if (e.getMessage().indexOf("destroyed") == -1) {
+ if (!e.getMessage().contains("destroyed")) {
throw e;
}
// region failed registration & is unusable
@@ -228,11 +227,11 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
}
doRegionDestroy(event);
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
logger.debug("{} Region destroyed: nothing to do", this);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
logger.debug("{} Cancelled: nothing to do", this);
- } catch (EntryNotFoundException e) {
+ } catch (EntryNotFoundException ignore) {
logger.debug("{} Entry not found, nothing to do", this);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
@@ -292,7 +291,7 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
// pool, the entry
// update is allowed to complete.
dm.getWaitingThreadPool().execute(destroyOp(dm, lclRgn, sendReply));
- } catch (RejectedExecutionException e) {
+ } catch (RejectedExecutionException ignore) {
// rejected while trying to execute destroy thread
// must be shutting down, just quit
}
@@ -303,19 +302,19 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
// shared region, since another cache may
// have already destroyed it in shared memory, in which our listeners
// still need to be called and java region object cleaned up.
- GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getInstance(sys);
+ InternalCache cache = (InternalCache) CacheFactory.getInstance(sys);
// only get the region while holding the appropriate destroy lock.
// this prevents us from getting a "stale" region
if (getOperation().isDistributed()) {
String rootName = GemFireCacheImpl.parsePath(path)[0];
- this.lockRoot = (LocalRegion) c.getRegion(rootName);
+ this.lockRoot = (LocalRegion) cache.getRegion(rootName);
if (this.lockRoot == null)
return null;
this.lockRoot.acquireDestroyLock();
}
- return (LocalRegion) c.getRegion(path);
+ return (LocalRegion) cache.getRegion(path);
}
private void disableRegionDepartureNotification() {
@@ -411,15 +410,15 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
rgn.basicDestroyRegion(ev, false /* cacheWrite */, false /* lock */,
true/* cacheCallbacks */);
}
- } catch (CacheWriterException e) {
+ } catch (CacheWriterException ignore) {
throw new Error(
LocalizedStrings.DestroyRegionOperation_CACHEWRITER_SHOULD_NOT_HAVE_BEEN_CALLED
.toLocalizedString());
- } catch (TimeoutException e) {
+ } catch (TimeoutException ignore) {
throw new Error(
LocalizedStrings.DestroyRegionOperation_DISTRIBUTEDLOCK_SHOULD_NOT_HAVE_BEEN_ACQUIRED
.toLocalizedString());
- } catch (RejectedExecutionException e) {
+ } catch (RejectedExecutionException ignore) {
// rejected while trying to execute recreate thread
// must be shutting down, so what we were trying to do must not be
// important anymore, so just quit
@@ -468,13 +467,13 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
}
public static final class DestroyRegionWithContextMessage extends DestroyRegionMessage {
+
protected transient Object context;
@Override
final public RegionEventImpl createRegionEvent(DistributedRegion rgn) {
- ClientRegionEventImpl event = new ClientRegionEventImpl(rgn, getOperation(), this.callbackArg,
+ return new ClientRegionEventImpl(rgn, getOperation(), this.callbackArg,
true /* originRemote */, getSender(), (ClientProxyMembershipID) this.context);
- return event;
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java
index bf7c4d2..f78a6c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java
@@ -27,7 +27,6 @@ import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.DiskStoreImpl.AsyncDiskEntry;
-import org.apache.geode.internal.cache.Token.Tombstone;
import org.apache.geode.internal.cache.lru.EnableLRU;
import org.apache.geode.internal.cache.lru.LRUClockNode;
import org.apache.geode.internal.cache.lru.LRUEntry;
@@ -52,18 +51,14 @@ import org.apache.geode.internal.util.BlobHelper;
* provides accessor and mutator methods for a disk entry's state. This allows us to abstract all of
* the interesting behavior into a {@linkplain DiskEntry.Helper helper class} that we only need to
* implement once.
- *
- * <P>
- *
- * Each <code>DiskEntry</code> has a unique <code>id</code> that is used by the {@link DiskRegion}
- * to identify the key/value pair. Before the disk entry is written to disk, the value of the
- * <code>id</code> is {@link DiskRegion#INVALID_ID invalid}. Once the object has been written to
- * disk, the <code>id</code> is a positive number. If the value is {@linkplain Helper#update
- * updated}, then the <code>id</code> is negated to signify that the value on disk is dirty.
+ * <p>
+ * Each {@code DiskEntry} has a unique {@code id} that is used by the {@link DiskRegion} to identify
+ * the key/value pair. Before the disk entry is written to disk, the value of the {@code id} is
+ * {@link DiskRegion#INVALID_ID invalid}. Once the object has been written to disk, the {@code id}
+ * is a positive number. If the value is {@linkplain Helper#update updated}, then the {@code id} is
+ * negated to signify that the value on disk is dirty.
*
* @see DiskRegion
- *
- *
* @since GemFire 3.2
*/
public interface DiskEntry extends RegionEntry {
@@ -78,8 +73,6 @@ public interface DiskEntry extends RegionEntry {
/**
* In some cases we need to do something just before we drop the value from a DiskEntry that is
* being moved (i.e. overflowed) to disk.
- *
- * @param context
*/
public void handleValueOverflow(RegionEntryContext context);
@@ -90,12 +83,10 @@ public interface DiskEntry extends RegionEntry {
public boolean isRemovedFromDisk();
/**
- * Returns the id of this <code>DiskEntry</code>
+ * Returns the id of this {@code DiskEntry}
*/
public DiskId getDiskId();
- public void _removePhase1();
-
public int updateAsyncEntrySize(EnableLRU capacityController);
public DiskEntry getPrev();
@@ -119,10 +110,8 @@ public interface DiskEntry extends RegionEntry {
*/
public static final byte[] TOMBSTONE_BYTES = new byte[0];
- /////////////////////// Inner Classes //////////////////////
-
/**
- * A Helper class for performing functions common to all <code>DiskEntry</code>s.
+ * A Helper class for performing functions common to all {@code DiskEntry}s.
*/
public static class Helper {
private static final Logger logger = LogService.getLogger();
@@ -185,12 +174,10 @@ public interface DiskEntry extends RegionEntry {
}
}
-
/**
* Get the value of an entry that is on disk without faulting it in . It checks for the presence
* in the buffer also. This method is used for concurrent map operations and CQ processing
*
- * @throws DiskAccessException
* @since GemFire 5.1
*/
static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr,
@@ -223,8 +210,8 @@ public interface DiskEntry extends RegionEntry {
synchronized (syncObj) {
if (did != null && did.isPendingAsync()) {
@Retained
- Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v =
- // entry.getValueWithContext(context);
+ Object v = entry._getValueRetain(context, true);
+
if (Token.isRemovedFromDisk(v)) {
v = null;
}
@@ -309,9 +296,11 @@ public interface DiskEntry extends RegionEntry {
entry.setLastModified(mgr, de.getLastModified());
ReferenceCountHelper.setReferenceCountOwner(entry);
- v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry
- // to refer to offheap since it will be copied to
- // network.
+
+ // OFFHEAP copied to heap entry;
+ // TODO: allow entry to refer to offheap since it will be copied to network.
+ v = de._getValueRetain(context, true);
+
ReferenceCountHelper.setReferenceCountOwner(null);
if (v == null) {
if (did == null) {
@@ -331,7 +320,7 @@ public interface DiskEntry extends RegionEntry {
BytesAndBits bb = null;
try {
bb = dr.getBytesAndBits(did, false);
- } catch (DiskAccessException dae) {
+ } catch (DiskAccessException ignore) {
return false;
}
if (EntryBits.isInvalid(bb.getBits())) {
@@ -367,8 +356,7 @@ public interface DiskEntry extends RegionEntry {
Object tmp = cd.getValue();
if (tmp instanceof byte[]) {
- byte[] bb = (byte[]) tmp;
- entry.value = bb;
+ entry.value = (byte[]) tmp;
entry.setSerialized(true);
} else {
try {
@@ -378,11 +366,10 @@ public interface DiskEntry extends RegionEntry {
entry.value = hdos;
entry.setSerialized(true);
} catch (IOException e) {
- RuntimeException e2 = new IllegalArgumentException(
+ throw new IllegalArgumentException(
LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING
- .toLocalizedString());
- e2.initCause(e);
- throw e2;
+ .toLocalizedString(),
+ e);
}
}
}
@@ -460,7 +447,7 @@ public interface DiskEntry extends RegionEntry {
} else {
entry.setValueWithContext(drv,
entry.prepareValueForCache((RegionEntryContext) r, re.getValue(), false));
- if (!Tombstone.isInvalidOrRemoved(re.getValue())) {
+ if (!Token.isInvalidOrRemoved(re.getValue())) {
updateStats(drv, r, 1/* InVM */, 0/* OnDisk */, 0);
}
}
@@ -574,7 +561,7 @@ public interface DiskEntry extends RegionEntry {
if (this.bytes == null) {
return "null";
}
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
int len = getLength();
for (int i = 0; i < len; i++) {
sb.append(this.bytes[i]).append(", ");
@@ -808,8 +795,6 @@ public interface DiskEntry extends RegionEntry {
/**
* Writes the key/value object stored in the given entry to disk
*
- * @throws RegionClearedException
- *
* @see DiskRegion#put
*/
private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async,
@@ -833,8 +818,6 @@ public interface DiskEntry extends RegionEntry {
/**
* Updates the value of the disk entry with a new value. This allows us to free up disk space in
* the non-backup case.
- *
- * @throws RegionClearedException
*/
public static void update(DiskEntry entry, LocalRegion region, Object newValue,
EntryEventImpl event) throws RegionClearedException {
@@ -892,7 +875,7 @@ public interface DiskEntry extends RegionEntry {
if (caughtCacheClosed) {
// 47616: not to set the value to be removedFromDisk since it failed to persist
} else {
- // Asif Ensure that the value is rightly set despite clear so
+ // Ensure that the value is rightly set despite clear so
// that it can be distributed correctly
entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already
// preparedForCache
@@ -1010,12 +993,12 @@ public interface DiskEntry extends RegionEntry {
@Retained
public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
@Retained
- Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v =
- // entry.getValueWithContext(region);
+ Object v = entry._getValueRetain(region, true);
+
if (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread()) {
synchronized (entry) {
- v = entry._getValueRetain(region, true); // TODO:KIRK:OK v =
- // entry.getValueWithContext(region);
+ v = entry._getValueRetain(region, true);
+
if (v == null) {
v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(), region);
}
@@ -1024,24 +1007,10 @@ public interface DiskEntry extends RegionEntry {
if (Token.isRemovedFromDisk(v)) {
// fix for bug 31800
v = null;
- // } else if (v instanceof ByteSource) {
- // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
- // Object deserVal = ((CachedDeserializable)v).getDeserializedForReading();
- // if (deserVal != v) {
- // OffHeapHelper.release(v);
- // v = deserVal;
- // }
}
return v;
}
- /**
- *
- * @param entry
- * @param region
- * @return Value
- * @throws DiskAccessException
- */
public static Object faultInValue(DiskEntry entry, LocalRegion region) {
return faultInValue(entry, region, false);
}
@@ -1058,8 +1027,8 @@ public interface DiskEntry extends RegionEntry {
private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult) {
DiskRegion dr = region.getDiskRegion();
@Retained
- Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v =
- // entry.getValueWithContext(region);
+ Object v = entry._getValueRetain(region, true);
+
boolean lruFaultedIn = false;
boolean done = false;
try {
@@ -1071,7 +1040,7 @@ public interface DiskEntry extends RegionEntry {
// See if it is pending async because of a faultOut.
// If so then if we are not a backup then we can unschedule the pending async.
// In either case we need to do the lruFaultIn logic.
- boolean evicted = ((LRUEntry) entry).testEvicted();
+ boolean evicted = ((LRUClockNode) entry).testEvicted();
if (evicted) {
if (!dr.isBackup()) {
// @todo do we also need a bit that tells us if it is in the async queue?
@@ -1086,8 +1055,8 @@ public interface DiskEntry extends RegionEntry {
}
if (!done && (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) {
synchronized (entry) {
- v = entry._getValueRetain(region, true); // TODO:KIRK:OK v =
- // entry.getValueWithContext(region);
+ v = entry._getValueRetain(region, true);
+
if (v == null) {
v = readValueFromDisk(entry, region);
if (entry instanceof LRUEntry) {
@@ -1126,8 +1095,7 @@ public interface DiskEntry extends RegionEntry {
DiskId did = entry.getDiskId();
if (did != null) {
Object value = null;
- DiskRecoveryStore region = recoveryStore;
- DiskRegionView dr = region.getDiskRegionView();
+ DiskRegionView dr = recoveryStore.getDiskRegionView();
dr.acquireReadLock();
try {
synchronized (did) {
@@ -1135,7 +1103,7 @@ public interface DiskEntry extends RegionEntry {
if (oplogId == did.getOplogId()) {
value = getValueFromDisk(dr, did, in);
if (value != null) {
- setValueOnFaultIn(value, did, entry, dr, region);
+ setValueOnFaultIn(value, did, entry, dr, recoveryStore);
}
}
}
@@ -1194,7 +1162,7 @@ public interface DiskEntry extends RegionEntry {
try {
if (recoveryStore.getEvictionAttributes() != null
&& recoveryStore.getEvictionAttributes().getAlgorithm().isLIFO()) {
- ((VMLRURegionMap) recoveryStore.getRegionMap()).updateStats();
+ ((AbstractLRURegionMap) recoveryStore.getRegionMap()).updateStats();
return;
}
// this must be done after releasing synchronization
@@ -1314,24 +1282,18 @@ public interface DiskEntry extends RegionEntry {
}
/**
- * Writes the value of this <code>DiskEntry</code> to disk and <code>null</code> s out the
- * reference to the value to free up VM space.
+ * Writes the value of this {@code DiskEntry} to disk and {@code null} s out the reference to
+ * the value to free up VM space.
* <p>
* Note that if the value had already been written to disk, it is not written again.
* <p>
* Caller must synchronize on entry and it is assumed the entry is evicted
- *
- * see #writeToDisk
- *
- * @throws RegionClearedException
*/
public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper)
throws RegionClearedException {
DiskRegion dr = region.getDiskRegion();
- final int oldSize = region.calculateRegionEntryValueSize(entry);;
- // Asif:Get diskID . If it is null, it implies it is
- // overflow only mode.
- // long id = entry.getDiskId().getKeyId();
+ final int oldSize = region.calculateRegionEntryValueSize(entry);
+ // Get diskID . If it is null, it implies it is overflow only mode.
DiskId did = entry.getDiskId();
if (did == null) {
((LRUEntry) entry).setDelayedDiskId(region);
@@ -1348,7 +1310,7 @@ public interface DiskEntry extends RegionEntry {
return 0;
}
- // TODO:Asif: Check if we need to overflow even when id is = 0
+ // TODO: Check if we need to overflow even when id is = 0
boolean wasAlreadyPendingAsync = did.isPendingAsync();
if (did.needsToBeWritten()) {
if (dr.isSync()) {
@@ -1474,7 +1436,7 @@ public interface DiskEntry extends RegionEntry {
// Only setValue to null if this was an evict.
// We could just be a backup that is writing async.
if (!Token.isInvalid(entryVal) && (entryVal != Token.TOMBSTONE)
- && entry instanceof LRUEntry && ((LRUEntry) entry).testEvicted()) {
+ && entry instanceof LRUEntry && ((LRUClockNode) entry).testEvicted()) {
// Moved this here to fix bug 40116.
region.updateSizeOnEvict(entry.getKey(), entryValSize);
updateStats(dr, region, -1/* InVM */, 1/* OnDisk */, did.getValueLength());
@@ -1603,11 +1565,6 @@ public interface DiskEntry extends RegionEntry {
return result;
}
- /**
- * @param entry
- * @param region
- * @param tag
- */
public static void updateVersionOnly(DiskEntry entry, LocalRegion region, VersionTag tag) {
DiskRegion dr = region.getDiskRegion();
if (!dr.isBackup()) {
@@ -1709,7 +1666,6 @@ public interface DiskEntry extends RegionEntry {
}
/**
- *
* @return byte indicating the user bits. The correct value is returned only in the specific
* case of entry recovered from oplog ( & not rolled to Htree) & the RECOVER_VALUES flag
* is false . In other cases the exact value is not needed
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
index 6d4b598..f8b8289 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
@@ -279,12 +279,12 @@ public class DiskRegion extends AbstractDiskRegion {
private void destroyOldTomstones(final DiskRecoveryStore drs) {
// iterate over all region entries in drs
drs.foreachRegionEntry(new RegionEntryCallback() {
- public void handleRegionEntry(RegionEntry re) {
- DiskEntry de = (DiskEntry) re;
+ public void handleRegionEntry(RegionEntry regionEntry) {
+ DiskEntry de = (DiskEntry) regionEntry;
synchronized (de) {
DiskId id = de.getDiskId();
- if (id != null && re.isTombstone()) {
- VersionStamp stamp = re.getVersionStamp();
+ if (id != null && regionEntry.isTombstone()) {
+ VersionStamp stamp = regionEntry.getVersionStamp();
if (getRegionVersionVector().isTombstoneTooOld(stamp.getMemberID(),
stamp.getRegionVersion())) {
drs.destroyRecoveredEntry(de.getKey());
@@ -299,8 +299,8 @@ public class DiskRegion extends AbstractDiskRegion {
private void destroyRemainingRecoveredEntries(final DiskRecoveryStore drs) {
// iterate over all region entries in drs
drs.foreachRegionEntry(new RegionEntryCallback() {
- public void handleRegionEntry(RegionEntry re) {
- DiskEntry de = (DiskEntry) re;
+ public void handleRegionEntry(RegionEntry regionEntry) {
+ DiskEntry de = (DiskEntry) regionEntry;
synchronized (de) {
DiskId id = de.getDiskId();
if (id != null) {
@@ -320,8 +320,8 @@ public class DiskRegion extends AbstractDiskRegion {
public void resetRecoveredEntries(final DiskRecoveryStore drs) {
// iterate over all region entries in drs
drs.foreachRegionEntry(new RegionEntryCallback() {
- public void handleRegionEntry(RegionEntry re) {
- DiskEntry de = (DiskEntry) re;
+ public void handleRegionEntry(RegionEntry regionEntry) {
+ DiskEntry de = (DiskEntry) regionEntry;
synchronized (de) {
DiskId id = de.getDiskId();
if (id != null) {
@@ -770,13 +770,13 @@ public class DiskRegion extends AbstractDiskRegion {
return;
}
region.foreachRegionEntry(new RegionEntryCallback() {
- public void handleRegionEntry(RegionEntry re) {
- DiskEntry de = (DiskEntry) re;
+ public void handleRegionEntry(RegionEntry regionEntry) {
+ DiskEntry de = (DiskEntry) regionEntry;
DiskId id = de.getDiskId();
if (id != null) {
synchronized (id) {
- re.setValueToNull(); // TODO why call _setValue twice in a row?
- re.removePhase2();
+ regionEntry.setValueToNull(); // TODO why call _setValue twice in a row?
+ regionEntry.removePhase2();
id.unmarkForWriting();
if (EntryBits.isNeedsValue(id.getUserBits())) {
long oplogId = id.getOplogId();
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
index 6f50c9f..309dea3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
@@ -26,8 +26,6 @@ import org.apache.geode.internal.cache.persistence.BackupInspector;
* oplogs that still need to be backed up, along with the lists of oplog files that should be
* deleted when the oplog is backed up. See
* {@link DiskStoreImpl#startBackup(File, BackupInspector, org.apache.geode.internal.cache.persistence.RestoreScript)}
- *
- *
*/
public class DiskStoreBackup {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
index 000bf0d..7a7044b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
@@ -18,7 +18,6 @@ import java.io.File;
import java.util.Arrays;
import org.apache.geode.GemFireIOException;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.distributed.internal.ResourceEvent;
@@ -35,14 +34,15 @@ import org.apache.geode.pdx.internal.TypeRegistry;
* @since GemFire prPersistSprint2
*/
public class DiskStoreFactoryImpl implements DiskStoreFactory {
- private final Cache cache;
+
+ private final InternalCache cache;
private final DiskStoreAttributes attrs = new DiskStoreAttributes();
- public DiskStoreFactoryImpl(Cache cache) {
+ public DiskStoreFactoryImpl(InternalCache cache) {
this.cache = cache;
}
- public DiskStoreFactoryImpl(Cache cache, DiskStoreAttributes attrs) {
+ public DiskStoreFactoryImpl(InternalCache cache, DiskStoreAttributes attrs) {
this.attrs.name = attrs.name;
setAutoCompact(attrs.getAutoCompact());
setAllowForceCompaction(attrs.getAllowForceCompaction());
@@ -90,13 +90,13 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
if (compactionThreshold < 0) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
- .toLocalizedString(new Object[] {CacheXml.COMPACTION_THRESHOLD,
- Integer.valueOf(compactionThreshold)}));
+ .toLocalizedString(
+ new Object[] {CacheXml.COMPACTION_THRESHOLD, compactionThreshold}));
} else if (compactionThreshold > 100) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_LESS_THAN_2_BUT_WAS_1
- .toLocalizedString(new Object[] {CacheXml.COMPACTION_THRESHOLD,
- Integer.valueOf(compactionThreshold), Integer.valueOf(100)}));
+ .toLocalizedString(
+ new Object[] {CacheXml.COMPACTION_THRESHOLD, compactionThreshold, 100}));
}
this.attrs.compactionThreshold = compactionThreshold;
return this;
@@ -106,7 +106,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
if (timeInterval < 0) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_TIME_INTERVAL_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE
- .toLocalizedString(Long.valueOf(timeInterval)));
+ .toLocalizedString(timeInterval));
}
this.attrs.timeInterval = timeInterval;
return this;
@@ -116,14 +116,12 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
InternalRegionArguments internalRegionArgs) {
this.attrs.name = name;
synchronized (this.cache) {
- assert this.cache instanceof GemFireCacheImpl;
- GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
DiskStoreImpl ds =
- new DiskStoreImpl(gfc, this.attrs, true/* ownedByRegion */, internalRegionArgs);
+ new DiskStoreImpl(this.cache, this.attrs, true/* ownedByRegion */, internalRegionArgs);
if (isOwnedByPR) {
ds.doInitialRecovery();
}
- gfc.addRegionOwnedDiskStore(ds);
+ this.cache.addRegionOwnedDiskStore(ds);
return ds;
}
}
@@ -137,15 +135,14 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
result = findExisting(name);
if (result == null) {
if (this.cache instanceof GemFireCacheImpl) {
- GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
- TypeRegistry registry = gfc.getPdxRegistry();
- DiskStoreImpl dsi = new DiskStoreImpl(gfc, this.attrs);
+ TypeRegistry registry = this.cache.getPdxRegistry();
+ DiskStoreImpl dsi = new DiskStoreImpl(this.cache, this.attrs);
result = dsi;
- /** Added for M&M **/
- gfc.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE,
- dsi);
+ // Added for M&M
+ this.cache.getInternalDistributedSystem()
+ .handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
dsi.doInitialRecovery();
- gfc.addDiskStore(dsi);
+ this.cache.addDiskStore(dsi);
if (registry != null) {
registry.creatingDiskStore(dsi);
}
@@ -163,8 +160,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
// member depends on state that goes into this disk store
// that isn't backed up.
if (this.cache instanceof GemFireCacheImpl) {
- GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
- BackupManager backup = gfc.getBackupManager();
+ BackupManager backup = this.cache.getBackupManager();
if (backup != null) {
backup.waitForBackup();
}
@@ -175,8 +171,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
private DiskStore findExisting(String name) {
DiskStore existing = null;
if (this.cache instanceof GemFireCacheImpl) {
- GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
- existing = gfc.findDiskStore(name);
+ existing = this.cache.findDiskStore(name);
if (existing != null) {
if (((DiskStoreImpl) existing).sameAs(this.attrs)) {
return existing;
@@ -192,8 +187,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
if (diskDirSizes.length != diskDirs.length) {
throw new IllegalArgumentException(
LocalizedStrings.AttributesFactory_NUMBER_OF_DISKSIZES_IS_0_WHICH_IS_NOT_EQUAL_TO_NUMBER_OF_DISK_DIRS_WHICH_IS_1
- .toLocalizedString(new Object[] {Integer.valueOf(diskDirSizes.length),
- Integer.valueOf(diskDirs.length)}));
+ .toLocalizedString(new Object[] {diskDirSizes.length, diskDirs.length}));
}
verifyNonNegativeDirSize(diskDirSizes);
checkIfDirectoriesExist(diskDirs);
@@ -207,8 +201,6 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
/**
* Checks if directories exist, if they don't then create those directories
- *
- * @param diskDirs
*/
public static void checkIfDirectoriesExist(File[] diskDirs) {
for (int i = 0; i < diskDirs.length; i++) {
@@ -225,15 +217,13 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
/**
* Verify all directory sizes are positive
- *
- * @param sizes
*/
public static void verifyNonNegativeDirSize(int[] sizes) {
for (int i = 0; i < sizes.length; i++) {
if (sizes[i] < 0) {
throw new IllegalArgumentException(
LocalizedStrings.AttributesFactory_DIR_SIZE_CANNOT_BE_NEGATIVE_0
- .toLocalizedString(Integer.valueOf(sizes[i])));
+ .toLocalizedString(sizes[i]));
}
}
}
@@ -254,7 +244,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
} else if (maxOplogSize < 0) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_MAXIMUM_OPLOG_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE
- .toLocalizedString(Long.valueOf(maxOplogSize)));
+ .toLocalizedString(maxOplogSize));
}
this.attrs.maxOplogSizeInBytes = maxOplogSize * (1024 * 1024);
return this;
@@ -267,7 +257,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
if (maxOplogSizeInBytes < 0) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_MAXIMUM_OPLOG_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE
- .toLocalizedString(Long.valueOf(maxOplogSizeInBytes)));
+ .toLocalizedString(maxOplogSizeInBytes));
}
this.attrs.maxOplogSizeInBytes = maxOplogSizeInBytes;
return this;
@@ -277,7 +267,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
if (queueSize < 0) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_QUEUE_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE
- .toLocalizedString(Integer.valueOf(queueSize)));
+ .toLocalizedString(queueSize));
}
this.attrs.queueSize = queueSize;
return this;
@@ -285,10 +275,10 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory {
public DiskStoreFactory setWriteBufferSize(int writeBufferSize) {
if (writeBufferSize < 0) {
- // TODO Gester add a message for WriteBufferSize
+ // TODO add a message for WriteBufferSize
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_QUEUE_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE
- .toLocalizedString(Integer.valueOf(writeBufferSize)));
+ .toLocalizedString(writeBufferSize));
}
this.attrs.writeBufferSize = writeBufferSize;
return this;
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index d1609ca..aeabbbc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -14,19 +14,62 @@
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetAddress;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
@@ -72,55 +115,11 @@ import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxField;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetAddress;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Represents a (disk-based) persistent store for region data. Used for both persistent recoverable
* regions and overflow-only regions.
- *
- *
+ *
* @since GemFire 3.2
*/
@SuppressWarnings("synthetic-access")
@@ -128,6 +127,7 @@ public class DiskStoreImpl implements DiskStore {
private static final Logger logger = LogService.getLogger();
private static final String BACKUP_DIR_PREFIX = "dir";
+
public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG");
public static final int MAX_OPEN_INACTIVE_OPLOGS =
@@ -166,6 +166,7 @@ public class DiskStoreImpl implements DiskStore {
public static final String RECOVER_VALUE_PROPERTY_NAME =
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValues";
+
public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME =
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValuesSync";
@@ -177,9 +178,12 @@ public class DiskStoreImpl implements DiskStore {
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverLruValues";
boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true);
+
boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false);
+
boolean FORCE_KRF_RECOVERY =
getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disk.FORCE_KRF_RECOVERY", false);
+
final boolean RECOVER_LRU_VALUES =
getBoolean(DiskStoreImpl.RECOVER_LRU_VALUES_PROPERTY_NAME, false);
@@ -188,7 +192,9 @@ public class DiskStoreImpl implements DiskStore {
}
public static final long MIN_RESERVED_DRID = 1;
+
public static final long MAX_RESERVED_DRID = 8;
+
static final long MIN_DRID = MAX_RESERVED_DRID + 1;
/**
@@ -205,9 +211,7 @@ public class DiskStoreImpl implements DiskStore {
private final int MAX_OPLOGS_PER_COMPACTION = Integer.getInteger(
DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION",
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_ROLL", 1).intValue());
- /**
- *
- */
+
public static final int MAX_CONCURRENT_COMPACTIONS = Integer.getInteger(
DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_COMPACTIONS",
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_ROLLS", 1).intValue());
@@ -219,6 +223,7 @@ public class DiskStoreImpl implements DiskStore {
*/
public static final int MAX_PENDING_TASKS =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "disk.MAX_PENDING_TASKS", 6);
+
/**
* This system property indicates that IF should also be preallocated. This property will be used
* in conjunction with the PREALLOCATE_OPLOGS property. If PREALLOCATE_OPLOGS is ON the below will
@@ -227,6 +232,7 @@ public class DiskStoreImpl implements DiskStore {
static final boolean PREALLOCATE_IF =
!System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "preAllocateIF", "true")
.equalsIgnoreCase("false");
+
/**
* This system property indicates that Oplogs should be preallocated till the maxOplogSize as
* specified for the disk store.
@@ -252,19 +258,14 @@ public class DiskStoreImpl implements DiskStore {
public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
- // /** delay for slowing down recovery, for testing purposes only */
- // public static volatile int recoverDelay = 0;
-
- // //////////////////// Instance Fields ///////////////////////
-
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
/** The stats for this store */
private final DiskStoreStats stats;
/**
- * Asif:Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of
- * the threads acquiring read lock, etc is not a good idea to solve the issue
+ * Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of the
+ * threads acquiring read lock, etc is not a good idea to solve the issue
*/
private final AtomicInteger entryOpsCount = new AtomicInteger();
/**
@@ -291,10 +292,11 @@ public class DiskStoreImpl implements DiskStore {
* is forced. If this value is 0 then no limit.
*/
private final int maxAsyncItems;
+
private final AtomicInteger forceFlushCount;
+
private final Object asyncMonitor;
- // complex vars
/** Compactor task which does the compaction. Null if compaction not possible. */
private final OplogCompactor oplogCompactor;
@@ -303,7 +305,9 @@ public class DiskStoreImpl implements DiskStore {
private volatile DiskStoreBackup diskStoreBackup = null;
private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock();
+
private final WriteLock compactorWriteLock = compactorLock.writeLock();
+
private final ReadLock compactorReadLock = compactorLock.readLock();
/**
@@ -316,37 +320,21 @@ public class DiskStoreImpl implements DiskStore {
new AtomicReference<DiskAccessException>();
PersistentOplogSet persistentOplogs = new PersistentOplogSet(this);
- OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
-
- // private boolean isThreadWaitingForSpace = false;
-
- /**
- * Get the next available dir
- */
-
- // /**
- // * Max timed wait for disk space to become available for an entry operation
- // ,
- // * in milliseconds. This will be the maximum time for which a
- // * create/modify/remove operation will wait so as to allow switch over & get
- // a
- // * new Oplog for writing. If no space is available in that time,
- // * DiskAccessException will be thrown. The default wait will be for 120
- // * seconds
- // */
- // private static final long MAX_WAIT_FOR_SPACE = Integer.getInteger(
- // "MAX_WAIT_FOR_SPACE", 20).intValue() * 1000;
+ OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
private final AtomicLong regionIdCtr = new AtomicLong(MIN_DRID);
+
/**
* Only contains backup DiskRegions. The Value could be a RecoveredDiskRegion or a DiskRegion
*/
private final ConcurrentMap<Long, DiskRegion> drMap = new ConcurrentHashMap<Long, DiskRegion>();
+
/**
* A set of overflow only regions that are using this disk store.
*/
private final Set<DiskRegion> overflowMap = new ConcurrentHashSet<DiskRegion>();
+
/**
* Contains all of the disk recovery stores for which we are recovering values asnynchronously.
*/
@@ -369,9 +357,8 @@ public class DiskStoreImpl implements DiskStore {
private final ThreadPoolExecutor diskStoreTaskPool;
private final ThreadPoolExecutor delayedWritePool;
- private volatile Future lastDelayedWrite;
- // ///////////////////// Constructors /////////////////////////
+ private volatile Future lastDelayedWrite;
private static int calcCompactionThreshold(int ct) {
if (ct == DiskStoreFactory.DEFAULT_COMPACTION_THRESHOLD) {
@@ -387,19 +374,19 @@ public class DiskStoreImpl implements DiskStore {
}
/**
- * Creates a new <code>DiskRegion</code> that access disk on behalf of the given region.
+ * Creates a new {@code DiskRegion} that access disk on behalf of the given region.
*/
- DiskStoreImpl(Cache cache, DiskStoreAttributes props) {
+ DiskStoreImpl(InternalCache cache, DiskStoreAttributes props) {
this(cache, props, false, null);
}
- DiskStoreImpl(Cache cache, DiskStoreAttributes props, boolean ownedByRegion,
+ DiskStoreImpl(InternalCache cache, DiskStoreAttributes props, boolean ownedByRegion,
InternalRegionArguments internalRegionArgs) {
this(cache, props.getName(), props, ownedByRegion, internalRegionArgs, false,
false/* upgradeVersionOnly */, false, false, true, false/* offlineModify */);
}
- DiskStoreImpl(Cache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
+ DiskStoreImpl(InternalCache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
InternalRegionArguments internalRegionArgs, boolean offline, boolean upgradeVersionOnly,
boolean offlineValidating, boolean offlineCompacting, boolean needsOplogs,
boolean offlineModify) {
@@ -427,7 +414,7 @@ public class DiskStoreImpl implements DiskStore {
this.warningPercent = props.getDiskUsageWarningPercentage();
this.criticalPercent = props.getDiskUsageCriticalPercentage();
- this.cache = (GemFireCacheImpl) cache;
+ this.cache = cache;
StatisticsFactory factory = cache.getDistributedSystem();
this.stats = new DiskStoreStats(factory, getName());
@@ -474,7 +461,7 @@ public class DiskStoreImpl implements DiskStore {
this.maxDirSize = tempMaxDirSize * 1024 * 1024;
this.infoFileDirIndex = 0;
// Now that we no longer have db files, use all directories for oplogs
- /**
+ /*
* The infoFileDir contains the lock file and the init file. It will be directories[0] on a
* brand new disk store. On an existing disk store it will be the directory the init file is
* found in.
@@ -495,7 +482,7 @@ public class DiskStoreImpl implements DiskStore {
int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
final ThreadGroup compactThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", this.logger);
+ LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
final ThreadFactory compactThreadFactory =
GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 10, TimeUnit.SECONDS,
@@ -504,7 +491,7 @@ public class DiskStoreImpl implements DiskStore {
final ThreadGroup deleteThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", this.logger);
+ LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", logger);
final ThreadFactory deleteThreadFactory =
GemfireCacheHelper.CreateThreadFactory(deleteThreadGroup, "Oplog Delete Task");
@@ -583,7 +570,7 @@ public class DiskStoreImpl implements DiskStore {
}
/**
- * Returns the <code>DiskStoreStats</code> for this store
+ * Returns the {@code DiskStoreStats} for this store
*/
public DiskStoreStats getStats() {
return this.stats;
@@ -697,7 +684,7 @@ public class DiskStoreImpl implements DiskStore {
* @param entry The entry which is going to be written to disk
* @throws RegionClearedException If a clear operation completed before the put operation
* completed successfully, resulting in the put operation to abort.
- * @throws IllegalArgumentException If <code>id</code> is less than zero
+ * @throws IllegalArgumentException If {@code id} is less than zero
*/
final void put(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async)
throws RegionClearedException {
@@ -886,7 +873,6 @@ public class DiskStoreImpl implements DiskStore {
* Given a BytesAndBits object convert it to the relevant Object (deserialize if necessary) and
* return the object
*
- * @param bb
* @return the converted object
*/
static Object convertBytesAndBitsIntoObject(BytesAndBits bb) {
@@ -909,7 +895,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Given a BytesAndBits object get the serialized blob
*
- * @param bb
* @return the converted object
*/
static Object convertBytesAndBitsToSerializedForm(BytesAndBits bb) {
@@ -1029,7 +1014,7 @@ public class DiskStoreImpl implements DiskStore {
* HTree with the oplog being destroyed
*
* @return null if entry has nothing stored on disk (id == INVALID_ID)
- * @throws IllegalArgumentException If <code>id</code> is less than zero, no action is taken.
+ * @throws IllegalArgumentException If {@code id} is less than zero, no action is taken.
*/
public final Object getNoBuffer(DiskRegion dr, DiskId id) {
BytesAndBits bb = null;
@@ -1067,8 +1052,8 @@ public class DiskStoreImpl implements DiskStore {
*
* @throws RegionClearedException If a clear operation completed before the put operation
* completed successfully, resulting in the put operation to abort.
- * @throws IllegalArgumentException If <code>id</code> is {@linkplain #INVALID_ID invalid}or is
- * less than zero, no action is taken.
+ * @throws IllegalArgumentException If {@code id} is {@linkplain #INVALID_ID invalid}or is less
+ * than zero, no action is taken.
*/
final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear)
throws RegionClearedException {
@@ -1191,7 +1176,7 @@ public class DiskStoreImpl implements DiskStore {
if (currentOpsInProgress == 0) {
synchronized (this.closeRegionGuard) {
if (dr.isRegionClosed() && entryOpsCount.get() == 0) {
- this.closeRegionGuard.notify();
+ this.closeRegionGuard.notifyAll();
}
}
}
@@ -1237,7 +1222,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Get serialized form of data off the disk
*
- * @param id
* @since GemFire 5.7
*/
public Object getSerializedData(DiskRegion dr, DiskId id) {
@@ -1269,7 +1253,7 @@ public class DiskStoreImpl implements DiskStore {
DiskEntry entry = ade.de;
DiskEntry.Helper.handleFullAsyncQueue(entry, region, tag);
}
- } catch (RegionDestroyedException ex) {
+ } catch (RegionDestroyedException ignore) {
// Normally we flush before closing or destroying a region
// but in some cases it is closed w/o flushing.
// So just ignore it; see bug 41305.
@@ -1397,8 +1381,7 @@ public class DiskStoreImpl implements DiskStore {
private int fillDrainList() {
synchronized (this.drainSync) {
this.drainList = new ArrayList(asyncQueue.size());
- int drainCount = asyncQueue.drainTo(this.drainList);
- return drainCount;
+ return asyncQueue.drainTo(this.drainList);
}
}
@@ -1410,8 +1393,6 @@ public class DiskStoreImpl implements DiskStore {
* To fix bug 41770 clear the list in a way that will not break a concurrent iterator that is not
* synced on drainSync. Only clear from it entries on the given region. Currently we do this by
* clearing the isPendingAsync bit on each entry in this list.
- *
- * @param rvv
*/
void clearDrainList(LocalRegion r, RegionVersionVector rvv) {
synchronized (this.drainSync) {
@@ -1516,7 +1497,7 @@ public class DiskStoreImpl implements DiskStore {
try {
this.flusherThread.join(waitMs);
return true;
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
return false;
@@ -1532,7 +1513,7 @@ public class DiskStoreImpl implements DiskStore {
}
}
- public GemFireCacheImpl getCache() {
+ public InternalCache getCache() {
return this.cache;
}
@@ -1759,7 +1740,7 @@ public class DiskStoreImpl implements DiskStore {
}
}
} // else
- } catch (RegionDestroyedException ex) {
+ } catch (RegionDestroyedException ignore) {
// Normally we flush before closing or destroying a region
// but in some cases it is closed w/o flushing.
// So just ignore it; see bug 41305.
@@ -2050,18 +2031,8 @@ public class DiskStoreImpl implements DiskStore {
return this.directories[this.infoFileDirIndex];
}
- /** For Testing * */
- // void addToOplogSet(int oplogID, File opFile, DirectoryHolder dirHolder) {
- // Oplog oplog = new Oplog(oplogID, this);
- // oplog.addRecoveredFile(opFile, dirHolder);
- // // @todo check callers to see if they need drf support
- // this.oplogSet.add(oplog);
- // }
-
- /** For Testing * */
/**
* returns the size of the biggest directory available to the region
- *
*/
public long getMaxDirSize() {
return maxDirSize;
@@ -2143,8 +2114,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Removes anything found in the async queue for the given region
- *
- * @param rvv
*/
private void clearAsyncQueue(LocalRegion region, boolean needsWriteLock,
RegionVersionVector rvv) {
@@ -2263,7 +2232,7 @@ public class DiskStoreImpl implements DiskStore {
if (diskException.get() != null) {
try {
_testHandleDiskAccessException.await();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -2466,25 +2435,26 @@ public class DiskStoreImpl implements DiskStore {
dr.setRegionClosed(true);
}
gotLock = true;
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
synchronized (this.closeRegionGuard) {
if (!dr.isRegionClosed()) {
if (!closeDataOnly) {
dr.setRegionClosed(true);
}
- // Asif: I am quite sure that it should also be Ok if instead
+ // I am quite sure that it should also be Ok if instead
// while it is a If Check below. Because if acquireReadLock
// thread
- // has acquired thelock, it is bound to see the isRegionClose as
+ // has acquired the lock, it is bound to see the isRegionClose as
// true
- // and so will realse teh lock causing decrement to zeo , before
+ // and so will release the lock causing decrement to zero , before
// releasing the closeRegionGuard. But still...not to take any
// chance
while (this.entryOpsCount.get() > 0) {
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(20000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignored) {
// Exit without closing the region, do not know what else
// can be done
Thread.currentThread().interrupt();
@@ -2534,8 +2504,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* stops the compactor outside the write lock. Once stopped then it proceeds to destroy the
* current & old oplogs
- *
- * @param dr
*/
void beginDestroyRegion(LocalRegion region, DiskRegion dr) {
if (dr.isBackup()) {
@@ -2571,7 +2539,7 @@ public class DiskStoreImpl implements DiskStore {
while (this.backgroundTasks.get() > 0) {
try {
this.backgroundTasks.wait(500L);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignore) {
interrupted = true;
}
}
@@ -2720,7 +2688,7 @@ public class DiskStoreImpl implements DiskStore {
return null;
}
- return l.toArray(new CompactableOplog[0]);
+ return l.toArray(new CompactableOplog[l.size()]);
}
/**
@@ -2745,7 +2713,6 @@ public class DiskStoreImpl implements DiskStore {
* @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in
* the restore script.
* @return an array of Oplogs to be copied for an incremental backup.
- * @throws IOException
*/
private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector,
Map<File, File> baselineCopyMap) throws IOException {
@@ -2796,11 +2763,9 @@ public class DiskStoreImpl implements DiskStore {
}
// Convert the filtered oplog list to an array
- return oplogList.toArray(new Oplog[] {});
+ return oplogList.toArray(new Oplog[oplogList.size()]);
}
-
-
/**
* Get all of the oplogs
*/
@@ -3013,7 +2978,7 @@ public class DiskStoreImpl implements DiskStore {
while (this.scheduled) {
try {
wait();
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -3114,30 +3079,13 @@ public class DiskStoreImpl implements DiskStore {
if (dr.isRegionClosed()) {
return;
}
- // // Stop the compactor if running, without taking lock.
- // if (this.oplogCompactor != null) {
- // try {
- // this.oplogCompactor.stopCompactor();
- // }
- // catch (CancelException ignore) {
- // // Asif:To fix Bug 39380 , ignore the cache closed exception here.
- // // allow it to call super .close so that it would be able to close
- // the
- // // oplogs
- // // Though I do not think this exception will be thrown by
- // // the stopCompactor. Still not taking chance and ignoring it
-
- // }
- // }
- // // if (!isSync()) {
- // stopAsyncFlusher(true); // do this before writeLock
- // // }
+
boolean gotLock = false;
try {
try {
acquireWriteLock(dr);
gotLock = true;
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// see workaround below.
}
@@ -3163,8 +3111,9 @@ public class DiskStoreImpl implements DiskStore {
}
boolean interrupted = Thread.interrupted();
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(1000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -3175,7 +3124,7 @@ public class DiskStoreImpl implements DiskStore {
if (this.entryOpsCount.get() > 0) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.DisKRegion_OUTSTANDING_OPS_REMAIN_AFTER_0_SECONDS_FOR_DISK_REGION_1,
- new Object[] {Integer.valueOf(loopCount), dr.getName()}));
+ new Object[] {loopCount, dr.getName()}));
for (;;) {
if (this.entryOpsCount.get() == 0) {
@@ -3183,8 +3132,9 @@ public class DiskStoreImpl implements DiskStore {
}
boolean interrupted = Thread.interrupted();
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(1000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -3233,7 +3183,7 @@ public class DiskStoreImpl implements DiskStore {
dr.resetRVV();
dr.setRVVTrusted(false);
dr.writeRVV(null, null); // just persist the empty rvv with trust=false
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
// ignore a RegionDestroyedException at this stage
}
if (this.initFile != null && dr.isBackup()) {
@@ -4111,11 +4061,6 @@ public class DiskStoreImpl implements DiskStore {
* Start the backup process. This is the second step of the backup process. In this method, we
* define the data we're backing up by copying the init file and rolling to the next file. After
* this method returns operations can proceed as normal, except that we don't remove oplogs.
- *
- * @param targetDir
- * @param baselineInspector
- * @param restoreScript
- * @throws IOException
*/
public void startBackup(File targetDir, BackupInspector baselineInspector,
RestoreScript restoreScript) throws IOException {
@@ -4130,7 +4075,7 @@ public class DiskStoreImpl implements DiskStore {
}
// Get an appropriate lock object for each set of oplogs.
- Object childLock = childOplog.lock;;
+ Object childLock = childOplog.lock;
// TODO - We really should move this lock into the disk store, but
// until then we need to do this magic to make sure we're actually
@@ -4201,9 +4146,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Copy the oplogs to the backup directory. This is the final step of the backup process. The
* oplogs we copy are defined in the startBackup method.
- *
- * @param backupManager
- * @throws IOException
*/
public void finishBackup(BackupManager backupManager) throws IOException {
if (diskStoreBackup == null) {
@@ -4312,17 +4254,17 @@ public class DiskStoreImpl implements DiskStore {
props.setProperty(CACHE_XML_FILE, "");
DistributedSystem ds = DistributedSystem.connect(props);
offlineDS = ds;
- Cache c = org.apache.geode.cache.CacheFactory.create(ds);
- offlineCache = c;
- org.apache.geode.cache.DiskStoreFactory dsf = c.createDiskStoreFactory();
+ InternalCache cache = (InternalCache) CacheFactory.create(ds);
+ offlineCache = cache;
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dsDirs);
if (offlineCompacting && maxOplogSize != -1L) {
dsf.setMaxOplogSize(maxOplogSize);
}
- DiskStoreImpl dsi = new DiskStoreImpl(c, dsName,
+ DiskStoreImpl dsi = new DiskStoreImpl(cache, dsName,
((DiskStoreFactoryImpl) dsf).getDiskStoreAttributes(), false, null, true,
upgradeVersionOnly, offlineValidate, offlineCompacting, needsOplogs, offlineModify);
- ((GemFireCacheImpl) c).addDiskStore(dsi);
+ cache.addDiskStore(dsi);
return dsi;
}
@@ -4536,7 +4478,7 @@ public class DiskStoreImpl implements DiskStore {
while (!isClosing() && currentAsyncValueRecoveryMap.containsKey(diskRegion.getId())) {
try {
currentAsyncValueRecoveryMap.wait();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
}
}
@@ -4591,9 +4533,9 @@ public class DiskStoreImpl implements DiskStore {
if (lastWriteTask != null) {
try {
lastWriteTask.get();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
- } catch (Exception e) {
+ } catch (Exception ignore) {
// do nothing, an exception from the write task was already logged.
}
}
@@ -4684,7 +4626,7 @@ public class DiskStoreImpl implements DiskStore {
delayedWritePool.shutdown();
try {
delayedWritePool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
index 551f733..ac72361 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
@@ -14,6 +14,19 @@
*/
package org.apache.geode.internal.cache;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -22,25 +35,16 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.*;
public class DiskStoreMonitor {
private static final Logger logger = LogService.getLogger();
private static final boolean DISABLE_MONITOR =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_DISABLE_MONITORING");
- // private static final boolean AUTO_RECONNECT =
- // Boolean.getBoolean("gemfire.DISK_USAGE_ENABLE_AUTO_RECONNECT");
private static final int USAGE_CHECK_INTERVAL = Integer
.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_POLLING_INTERVAL_MILLIS", 10000);
+
private static final float LOG_WARNING_THRESHOLD_PCT =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_LOG_WARNING_PERCENT", 99);
@@ -67,7 +71,7 @@ public class DiskStoreMonitor {
if (val < 0 || val > 100) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_WARNING_INVALID_0
- .toLocalizedString(Float.valueOf(val)));
+ .toLocalizedString(val));
}
}
@@ -80,17 +84,15 @@ public class DiskStoreMonitor {
if (val < 0 || val > 100) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_CRITICAL_INVALID_0
- .toLocalizedString(Float.valueOf(val)));
+ .toLocalizedString(val));
}
}
private final ScheduledExecutorService exec;
private final Map<DiskStoreImpl, Set<DirectoryHolderUsage>> disks;
- private final LogUsage logDisk;
- // // this is set when we go into auto_reconnect mode
- // private volatile DirectoryHolderUsage criticalDisk;
+ private final LogUsage logDisk;
volatile DiskStateAction _testAction;
@@ -209,9 +211,9 @@ public class DiskStoreMonitor {
private File getLogDir() {
File log = null;
- GemFireCacheImpl gci = GemFireCacheImpl.getInstance();
- if (gci != null) {
- InternalDistributedSystem ds = gci.getInternalDistributedSystem();
+ InternalCache internalCache = GemFireCacheImpl.getInstance();
+ if (internalCache != null) {
+ InternalDistributedSystem ds = internalCache.getInternalDistributedSystem();
if (ds != null) {
DistributionConfig conf = ds.getConfig();
if (conf != null) {
@@ -230,7 +232,7 @@ public class DiskStoreMonitor {
return log;
}
- abstract class DiskUsage {
+ abstract static class DiskUsage {
private DiskState state;
DiskUsage() {
@@ -305,7 +307,7 @@ public class DiskStoreMonitor {
protected abstract void handleStateChange(DiskState next, String pct);
}
- class LogUsage extends DiskUsage {
+ static class LogUsage extends DiskUsage {
private final File dir;
public LogUsage(File dir) {
@@ -382,41 +384,12 @@ public class DiskStoreMonitor {
logger.error(LogMarker.DISK_STORE_MONITOR,
LocalizedMessage.create(LocalizedStrings.DiskStoreMonitor_DISK_CRITICAL, args));
- try {
- // // prepare for restart
- // if (AUTO_RECONNECT) {
- // disk.getCache().saveCacheXmlForReconnect();
- // criticalDisk = this;
- // }
- } finally {
- // pull the plug
- disk.handleDiskAccessException(new DiskAccessException(msg, disk));
- }
+ // TODO: this is weird...
+ disk.handleDiskAccessException(new DiskAccessException(msg, disk));
break;
}
}
- // private void performReconnect(String msg) {
- // try {
- // // don't try to reconnect before the cache is closed
- // disk._testHandleDiskAccessException.await();
- //
- // // now reconnect, clear out the var first so a close can interrupt the
- // // reconnect
- // criticalDisk = null;
- // boolean restart = disk.getCache().getDistributedSystem().tryReconnect(true, msg,
- // disk.getCache());
- // if (LogMarker.DISK_STORE_MONITOR || logger.isDebugEnabled()) {
- // String pre = restart ? "Successfully" : "Unsuccessfully";
- // logger.info(LocalizedStrings.DEBUG, pre + " attempted to restart cache");
- // }
- // } catch (InterruptedException e) {
- // Thread.currentThread().interrupt();
- // } finally {
- // close();
- // }
- // }
-
@Override
protected File dir() {
return dir.getDir();
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 36ad9ce..e22e1d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -48,12 +48,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
-/**
- *
- */
public class DistTXCommitMessage extends TXMessage {
private static final Logger logger = LogService.getLogger();
+
protected ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null;
/** for deserialization */
@@ -75,7 +73,7 @@ public class DistTXCommitMessage extends TXMessage {
logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
TXManagerImpl txMgr = cache.getTXMgr();
final TXStateProxy txStateProxy = txMgr.getTXState();
TXCommitMessage cmsg = null;
@@ -256,7 +254,7 @@ public class DistTXCommitMessage extends TXMessage {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=").append(this.processorId)
.append(" reply to sender ").append(this.getSender());
return sb.toString();
@@ -339,7 +337,7 @@ public class DistTXCommitMessage extends TXMessage {
(DistTxCommitExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -349,7 +347,7 @@ public class DistTXCommitMessage extends TXMessage {
(DistTxCommitExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -387,14 +385,12 @@ public class DistTXCommitMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
- * @param msgMap
*/
public void handlePotentialCommitFailure(
HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
if (fatalExceptions.size() > 0) {
- StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+ .append(id).append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -428,16 +424,13 @@ public class DistTXCommitMessage extends TXMessage {
public Set getRegionDestroyedMembers(String regionFullPath) {
Set members = (Set) this.regionExceptions.get(regionFullPath);
if (members == null) {
- members = Collections.EMPTY_SET;
+ members = Collections.emptySet();
}
return members;
}
/**
* Protected by (this)
- *
- * @param member
- * @param exceptions
*/
public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
index ffbc3ba..0ab2cc3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
@@ -54,7 +54,7 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
- *
+ *
*/
public final class DistTXPrecommitMessage extends TXMessage {
@@ -107,7 +107,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/*
* Perform precommit
- *
+ *
* [DISTTX] Handle different exceptions here
*/
txMgr.precommit();
@@ -202,7 +202,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
* Return the value from the get operation, serialize it bytes as late as possible to avoid
* making un-neccesary byte[] copies. De-serialize those same bytes as late as possible to avoid
* using precious threads (aka P2P readers).
- *
+ *
* @param recipient the origin VM that performed the get
* @param processorId the processor on which the origin thread is waiting
* @param val the raw value that will eventually be serialized
@@ -218,7 +218,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Processes this message. This method is invoked by the receiver of the message.
- *
+ *
* @param dm the distribution manager that is processing the message.
*/
@Override
@@ -272,9 +272,9 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Reply processor which collects all CommitReplyExceptions for Dist Tx and emits a detailed
* failure exception if problems occur
- *
+ *
* @see TXCommitMessage.CommitReplyProcessor
- *
+ *
* [DISTTX] TODO see if need ReliableReplyProcessor21? departed members?
*/
public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 {
@@ -361,7 +361,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* An Exception that collects many remote CommitExceptions
- *
+ *
* @see TXCommitMessage.CommitExceptionCollectingException
*/
public static class DistTxPrecommitExceptionCollectingException extends ReplyException {
@@ -388,7 +388,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
+ *
* @param msgMap
*/
public void handlePotentialCommitFailure(
@@ -436,7 +436,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Protected by (this)
- *
+ *
* @param member
* @param exceptions
*/
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
index bfe302a..d4f5943 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
@@ -75,7 +75,7 @@ public final class DistTXRollbackMessage extends TXMessage {
logger.debug("Dist TX: Rollback: {}", txId);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
TXManagerImpl txMgr = cache.getTXMgr();
final TXStateProxy txState = txMgr.getTXState();
boolean rollbackSuccessful = false;
@@ -87,10 +87,6 @@ public final class DistTXRollbackMessage extends TXMessage {
"DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}",
txId);
}
- // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId);
- // if (txMgr.isExceptionToken(cmsg)) {
- // throw txMgr.getExceptionForToken(cmsg, txId);
- // }
} else if (txState != null) {
// [DISTTX] TODO - Handle scenarios of no txState
// if no TXState was created (e.g. due to only getEntry/size operations
@@ -219,7 +215,7 @@ public final class DistTXRollbackMessage extends TXMessage {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("DistTXRollbackReplyMessage ").append("processorid=").append(this.processorId)
.append(" reply to sender ").append(this.getSender());
return sb.toString();
@@ -232,7 +228,6 @@ public final class DistTXRollbackMessage extends TXMessage {
/**
* A processor to capture the value returned by {@link DistTXRollbackReplyMessage}
- *
*/
public static class DistTXRollbackResponse extends RemoteOperationResponse {
private volatile Boolean rollbackState;
@@ -275,9 +270,6 @@ public final class DistTXRollbackMessage extends TXMessage {
final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing";
logger.debug(msg, e);
throw e;
- } catch (TransactionDataNotColocatedException e) {
- // Throw this up to user!
- throw e;
}
return rollbackState;
}
@@ -354,7 +346,7 @@ public final class DistTXRollbackMessage extends TXMessage {
(DistTxRollbackExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -364,7 +356,7 @@ public final class DistTXRollbackMessage extends TXMessage {
(DistTxRollbackExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -402,14 +394,12 @@ public final class DistTXRollbackMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
- * @param msgMap
*/
public void handlePotentialCommitFailure(
HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
if (fatalExceptions.size() > 0) {
- StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+ .append(id).append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -443,16 +433,13 @@ public final class DistTXRollbackMessage extends TXMessage {
public Set getRegionDestroyedMembers(String regionFullPath) {
Set members = (Set) this.regionExceptions.get(regionFullPath);
if (members == null) {
- members = Collections.EMPTY_SET;
+ members = Collections.emptySet();
}
return members;
}
/**
* Protected by (this)
- *
- * @param member
- * @param exceptions
*/
public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
for (Iterator iter = exceptions.iterator(); iter.hasNext();) {