You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/08 23:16:06 UTC
[28/46] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index d1609ca..aeabbbc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -14,19 +14,62 @@
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetAddress;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.DiskStoreFactory;
@@ -72,55 +115,11 @@ import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxField;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetAddress;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Represents a (disk-based) persistent store for region data. Used for both persistent recoverable
* regions and overflow-only regions.
- *
- *
+ *
* @since GemFire 3.2
*/
@SuppressWarnings("synthetic-access")
@@ -128,6 +127,7 @@ public class DiskStoreImpl implements DiskStore {
private static final Logger logger = LogService.getLogger();
private static final String BACKUP_DIR_PREFIX = "dir";
+
public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG");
public static final int MAX_OPEN_INACTIVE_OPLOGS =
@@ -166,6 +166,7 @@ public class DiskStoreImpl implements DiskStore {
public static final String RECOVER_VALUE_PROPERTY_NAME =
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValues";
+
public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME =
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValuesSync";
@@ -177,9 +178,12 @@ public class DiskStoreImpl implements DiskStore {
DistributionConfig.GEMFIRE_PREFIX + "disk.recoverLruValues";
boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true);
+
boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false);
+
boolean FORCE_KRF_RECOVERY =
getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disk.FORCE_KRF_RECOVERY", false);
+
final boolean RECOVER_LRU_VALUES =
getBoolean(DiskStoreImpl.RECOVER_LRU_VALUES_PROPERTY_NAME, false);
@@ -188,7 +192,9 @@ public class DiskStoreImpl implements DiskStore {
}
public static final long MIN_RESERVED_DRID = 1;
+
public static final long MAX_RESERVED_DRID = 8;
+
static final long MIN_DRID = MAX_RESERVED_DRID + 1;
/**
@@ -205,9 +211,7 @@ public class DiskStoreImpl implements DiskStore {
private final int MAX_OPLOGS_PER_COMPACTION = Integer.getInteger(
DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION",
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_ROLL", 1).intValue());
- /**
- *
- */
+
public static final int MAX_CONCURRENT_COMPACTIONS = Integer.getInteger(
DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_COMPACTIONS",
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_ROLLS", 1).intValue());
@@ -219,6 +223,7 @@ public class DiskStoreImpl implements DiskStore {
*/
public static final int MAX_PENDING_TASKS =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "disk.MAX_PENDING_TASKS", 6);
+
/**
* This system property indicates that IF should also be preallocated. This property will be used
* in conjunction with the PREALLOCATE_OPLOGS property. If PREALLOCATE_OPLOGS is ON the below will
@@ -227,6 +232,7 @@ public class DiskStoreImpl implements DiskStore {
static final boolean PREALLOCATE_IF =
!System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "preAllocateIF", "true")
.equalsIgnoreCase("false");
+
/**
* This system property indicates that Oplogs should be preallocated till the maxOplogSize as
* specified for the disk store.
@@ -252,19 +258,14 @@ public class DiskStoreImpl implements DiskStore {
public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS;
public static volatile HashSet<String> TEST_NO_FALLOC_DIRS;
- // /** delay for slowing down recovery, for testing purposes only */
- // public static volatile int recoverDelay = 0;
-
- // //////////////////// Instance Fields ///////////////////////
-
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
/** The stats for this store */
private final DiskStoreStats stats;
/**
- * Asif:Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of
- * the threads acquiring read lock, etc is not a good idea to solve the issue
+ * Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of the
+ * threads acquiring read lock, etc is not a good idea to solve the issue
*/
private final AtomicInteger entryOpsCount = new AtomicInteger();
/**
@@ -291,10 +292,11 @@ public class DiskStoreImpl implements DiskStore {
* is forced. If this value is 0 then no limit.
*/
private final int maxAsyncItems;
+
private final AtomicInteger forceFlushCount;
+
private final Object asyncMonitor;
- // complex vars
/** Compactor task which does the compaction. Null if compaction not possible. */
private final OplogCompactor oplogCompactor;
@@ -303,7 +305,9 @@ public class DiskStoreImpl implements DiskStore {
private volatile DiskStoreBackup diskStoreBackup = null;
private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock();
+
private final WriteLock compactorWriteLock = compactorLock.writeLock();
+
private final ReadLock compactorReadLock = compactorLock.readLock();
/**
@@ -316,37 +320,21 @@ public class DiskStoreImpl implements DiskStore {
new AtomicReference<DiskAccessException>();
PersistentOplogSet persistentOplogs = new PersistentOplogSet(this);
- OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
-
- // private boolean isThreadWaitingForSpace = false;
-
- /**
- * Get the next available dir
- */
-
- // /**
- // * Max timed wait for disk space to become available for an entry operation
- // ,
- // * in milliseconds. This will be the maximum time for which a
- // * create/modify/remove operation will wait so as to allow switch over & get
- // a
- // * new Oplog for writing. If no space is available in that time,
- // * DiskAccessException will be thrown. The default wait will be for 120
- // * seconds
- // */
- // private static final long MAX_WAIT_FOR_SPACE = Integer.getInteger(
- // "MAX_WAIT_FOR_SPACE", 20).intValue() * 1000;
+ OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
private final AtomicLong regionIdCtr = new AtomicLong(MIN_DRID);
+
/**
* Only contains backup DiskRegions. The Value could be a RecoveredDiskRegion or a DiskRegion
*/
private final ConcurrentMap<Long, DiskRegion> drMap = new ConcurrentHashMap<Long, DiskRegion>();
+
/**
* A set of overflow only regions that are using this disk store.
*/
private final Set<DiskRegion> overflowMap = new ConcurrentHashSet<DiskRegion>();
+
/**
* Contains all of the disk recovery stores for which we are recovering values asnynchronously.
*/
@@ -369,9 +357,8 @@ public class DiskStoreImpl implements DiskStore {
private final ThreadPoolExecutor diskStoreTaskPool;
private final ThreadPoolExecutor delayedWritePool;
- private volatile Future lastDelayedWrite;
- // ///////////////////// Constructors /////////////////////////
+ private volatile Future lastDelayedWrite;
private static int calcCompactionThreshold(int ct) {
if (ct == DiskStoreFactory.DEFAULT_COMPACTION_THRESHOLD) {
@@ -387,19 +374,19 @@ public class DiskStoreImpl implements DiskStore {
}
/**
- * Creates a new <code>DiskRegion</code> that access disk on behalf of the given region.
+ * Creates a new {@code DiskRegion} that access disk on behalf of the given region.
*/
- DiskStoreImpl(Cache cache, DiskStoreAttributes props) {
+ DiskStoreImpl(InternalCache cache, DiskStoreAttributes props) {
this(cache, props, false, null);
}
- DiskStoreImpl(Cache cache, DiskStoreAttributes props, boolean ownedByRegion,
+ DiskStoreImpl(InternalCache cache, DiskStoreAttributes props, boolean ownedByRegion,
InternalRegionArguments internalRegionArgs) {
this(cache, props.getName(), props, ownedByRegion, internalRegionArgs, false,
false/* upgradeVersionOnly */, false, false, true, false/* offlineModify */);
}
- DiskStoreImpl(Cache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
+ DiskStoreImpl(InternalCache cache, String name, DiskStoreAttributes props, boolean ownedByRegion,
InternalRegionArguments internalRegionArgs, boolean offline, boolean upgradeVersionOnly,
boolean offlineValidating, boolean offlineCompacting, boolean needsOplogs,
boolean offlineModify) {
@@ -427,7 +414,7 @@ public class DiskStoreImpl implements DiskStore {
this.warningPercent = props.getDiskUsageWarningPercentage();
this.criticalPercent = props.getDiskUsageCriticalPercentage();
- this.cache = (GemFireCacheImpl) cache;
+ this.cache = cache;
StatisticsFactory factory = cache.getDistributedSystem();
this.stats = new DiskStoreStats(factory, getName());
@@ -474,7 +461,7 @@ public class DiskStoreImpl implements DiskStore {
this.maxDirSize = tempMaxDirSize * 1024 * 1024;
this.infoFileDirIndex = 0;
// Now that we no longer have db files, use all directories for oplogs
- /**
+ /*
* The infoFileDir contains the lock file and the init file. It will be directories[0] on a
* brand new disk store. On an existing disk store it will be the directory the init file is
* found in.
@@ -495,7 +482,7 @@ public class DiskStoreImpl implements DiskStore {
int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS;
final ThreadGroup compactThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", this.logger);
+ LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger);
final ThreadFactory compactThreadFactory =
GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 10, TimeUnit.SECONDS,
@@ -504,7 +491,7 @@ public class DiskStoreImpl implements DiskStore {
final ThreadGroup deleteThreadGroup =
- LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", this.logger);
+ LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", logger);
final ThreadFactory deleteThreadFactory =
GemfireCacheHelper.CreateThreadFactory(deleteThreadGroup, "Oplog Delete Task");
@@ -583,7 +570,7 @@ public class DiskStoreImpl implements DiskStore {
}
/**
- * Returns the <code>DiskStoreStats</code> for this store
+ * Returns the {@code DiskStoreStats} for this store
*/
public DiskStoreStats getStats() {
return this.stats;
@@ -697,7 +684,7 @@ public class DiskStoreImpl implements DiskStore {
* @param entry The entry which is going to be written to disk
* @throws RegionClearedException If a clear operation completed before the put operation
* completed successfully, resulting in the put operation to abort.
- * @throws IllegalArgumentException If <code>id</code> is less than zero
+ * @throws IllegalArgumentException If {@code id} is less than zero
*/
final void put(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async)
throws RegionClearedException {
@@ -886,7 +873,6 @@ public class DiskStoreImpl implements DiskStore {
* Given a BytesAndBits object convert it to the relevant Object (deserialize if necessary) and
* return the object
*
- * @param bb
* @return the converted object
*/
static Object convertBytesAndBitsIntoObject(BytesAndBits bb) {
@@ -909,7 +895,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Given a BytesAndBits object get the serialized blob
*
- * @param bb
* @return the converted object
*/
static Object convertBytesAndBitsToSerializedForm(BytesAndBits bb) {
@@ -1029,7 +1014,7 @@ public class DiskStoreImpl implements DiskStore {
* HTree with the oplog being destroyed
*
* @return null if entry has nothing stored on disk (id == INVALID_ID)
- * @throws IllegalArgumentException If <code>id</code> is less than zero, no action is taken.
+ * @throws IllegalArgumentException If {@code id} is less than zero, no action is taken.
*/
public final Object getNoBuffer(DiskRegion dr, DiskId id) {
BytesAndBits bb = null;
@@ -1067,8 +1052,8 @@ public class DiskStoreImpl implements DiskStore {
*
* @throws RegionClearedException If a clear operation completed before the put operation
* completed successfully, resulting in the put operation to abort.
- * @throws IllegalArgumentException If <code>id</code> is {@linkplain #INVALID_ID invalid}or is
- * less than zero, no action is taken.
+ * @throws IllegalArgumentException If {@code id} is {@linkplain #INVALID_ID invalid}or is less
+ * than zero, no action is taken.
*/
final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear)
throws RegionClearedException {
@@ -1191,7 +1176,7 @@ public class DiskStoreImpl implements DiskStore {
if (currentOpsInProgress == 0) {
synchronized (this.closeRegionGuard) {
if (dr.isRegionClosed() && entryOpsCount.get() == 0) {
- this.closeRegionGuard.notify();
+ this.closeRegionGuard.notifyAll();
}
}
}
@@ -1237,7 +1222,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Get serialized form of data off the disk
*
- * @param id
* @since GemFire 5.7
*/
public Object getSerializedData(DiskRegion dr, DiskId id) {
@@ -1269,7 +1253,7 @@ public class DiskStoreImpl implements DiskStore {
DiskEntry entry = ade.de;
DiskEntry.Helper.handleFullAsyncQueue(entry, region, tag);
}
- } catch (RegionDestroyedException ex) {
+ } catch (RegionDestroyedException ignore) {
// Normally we flush before closing or destroying a region
// but in some cases it is closed w/o flushing.
// So just ignore it; see bug 41305.
@@ -1397,8 +1381,7 @@ public class DiskStoreImpl implements DiskStore {
private int fillDrainList() {
synchronized (this.drainSync) {
this.drainList = new ArrayList(asyncQueue.size());
- int drainCount = asyncQueue.drainTo(this.drainList);
- return drainCount;
+ return asyncQueue.drainTo(this.drainList);
}
}
@@ -1410,8 +1393,6 @@ public class DiskStoreImpl implements DiskStore {
* To fix bug 41770 clear the list in a way that will not break a concurrent iterator that is not
* synced on drainSync. Only clear from it entries on the given region. Currently we do this by
* clearing the isPendingAsync bit on each entry in this list.
- *
- * @param rvv
*/
void clearDrainList(LocalRegion r, RegionVersionVector rvv) {
synchronized (this.drainSync) {
@@ -1516,7 +1497,7 @@ public class DiskStoreImpl implements DiskStore {
try {
this.flusherThread.join(waitMs);
return true;
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
return false;
@@ -1532,7 +1513,7 @@ public class DiskStoreImpl implements DiskStore {
}
}
- public GemFireCacheImpl getCache() {
+ public InternalCache getCache() {
return this.cache;
}
@@ -1759,7 +1740,7 @@ public class DiskStoreImpl implements DiskStore {
}
}
} // else
- } catch (RegionDestroyedException ex) {
+ } catch (RegionDestroyedException ignore) {
// Normally we flush before closing or destroying a region
// but in some cases it is closed w/o flushing.
// So just ignore it; see bug 41305.
@@ -2050,18 +2031,8 @@ public class DiskStoreImpl implements DiskStore {
return this.directories[this.infoFileDirIndex];
}
- /** For Testing * */
- // void addToOplogSet(int oplogID, File opFile, DirectoryHolder dirHolder) {
- // Oplog oplog = new Oplog(oplogID, this);
- // oplog.addRecoveredFile(opFile, dirHolder);
- // // @todo check callers to see if they need drf support
- // this.oplogSet.add(oplog);
- // }
-
- /** For Testing * */
/**
* returns the size of the biggest directory available to the region
- *
*/
public long getMaxDirSize() {
return maxDirSize;
@@ -2143,8 +2114,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Removes anything found in the async queue for the given region
- *
- * @param rvv
*/
private void clearAsyncQueue(LocalRegion region, boolean needsWriteLock,
RegionVersionVector rvv) {
@@ -2263,7 +2232,7 @@ public class DiskStoreImpl implements DiskStore {
if (diskException.get() != null) {
try {
_testHandleDiskAccessException.await();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -2466,25 +2435,26 @@ public class DiskStoreImpl implements DiskStore {
dr.setRegionClosed(true);
}
gotLock = true;
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
synchronized (this.closeRegionGuard) {
if (!dr.isRegionClosed()) {
if (!closeDataOnly) {
dr.setRegionClosed(true);
}
- // Asif: I am quite sure that it should also be Ok if instead
+ // I am quite sure that it should also be Ok if instead
// while it is a If Check below. Because if acquireReadLock
// thread
- // has acquired thelock, it is bound to see the isRegionClose as
+ // has acquired the lock, it is bound to see the isRegionClose as
// true
- // and so will realse teh lock causing decrement to zeo , before
+ // and so will release the lock causing decrement to zero , before
// releasing the closeRegionGuard. But still...not to take any
// chance
while (this.entryOpsCount.get() > 0) {
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(20000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignored) {
// Exit without closing the region, do not know what else
// can be done
Thread.currentThread().interrupt();
@@ -2534,8 +2504,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* stops the compactor outside the write lock. Once stopped then it proceeds to destroy the
* current & old oplogs
- *
- * @param dr
*/
void beginDestroyRegion(LocalRegion region, DiskRegion dr) {
if (dr.isBackup()) {
@@ -2571,7 +2539,7 @@ public class DiskStoreImpl implements DiskStore {
while (this.backgroundTasks.get() > 0) {
try {
this.backgroundTasks.wait(500L);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignore) {
interrupted = true;
}
}
@@ -2720,7 +2688,7 @@ public class DiskStoreImpl implements DiskStore {
return null;
}
- return l.toArray(new CompactableOplog[0]);
+ return l.toArray(new CompactableOplog[l.size()]);
}
/**
@@ -2745,7 +2713,6 @@ public class DiskStoreImpl implements DiskStore {
* @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in
* the restore script.
* @return an array of Oplogs to be copied for an incremental backup.
- * @throws IOException
*/
private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector,
Map<File, File> baselineCopyMap) throws IOException {
@@ -2796,11 +2763,9 @@ public class DiskStoreImpl implements DiskStore {
}
// Convert the filtered oplog list to an array
- return oplogList.toArray(new Oplog[] {});
+ return oplogList.toArray(new Oplog[oplogList.size()]);
}
-
-
/**
* Get all of the oplogs
*/
@@ -3013,7 +2978,7 @@ public class DiskStoreImpl implements DiskStore {
while (this.scheduled) {
try {
wait();
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
@@ -3114,30 +3079,13 @@ public class DiskStoreImpl implements DiskStore {
if (dr.isRegionClosed()) {
return;
}
- // // Stop the compactor if running, without taking lock.
- // if (this.oplogCompactor != null) {
- // try {
- // this.oplogCompactor.stopCompactor();
- // }
- // catch (CancelException ignore) {
- // // Asif:To fix Bug 39380 , ignore the cache closed exception here.
- // // allow it to call super .close so that it would be able to close
- // the
- // // oplogs
- // // Though I do not think this exception will be thrown by
- // // the stopCompactor. Still not taking chance and ignoring it
-
- // }
- // }
- // // if (!isSync()) {
- // stopAsyncFlusher(true); // do this before writeLock
- // // }
+
boolean gotLock = false;
try {
try {
acquireWriteLock(dr);
gotLock = true;
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// see workaround below.
}
@@ -3163,8 +3111,9 @@ public class DiskStoreImpl implements DiskStore {
}
boolean interrupted = Thread.interrupted();
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(1000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -3175,7 +3124,7 @@ public class DiskStoreImpl implements DiskStore {
if (this.entryOpsCount.get() > 0) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.DisKRegion_OUTSTANDING_OPS_REMAIN_AFTER_0_SECONDS_FOR_DISK_REGION_1,
- new Object[] {Integer.valueOf(loopCount), dr.getName()}));
+ new Object[] {loopCount, dr.getName()}));
for (;;) {
if (this.entryOpsCount.get() == 0) {
@@ -3183,8 +3132,9 @@ public class DiskStoreImpl implements DiskStore {
}
boolean interrupted = Thread.interrupted();
try {
+ // TODO: calling wait while holding two locks
this.closeRegionGuard.wait(1000);
- } catch (InterruptedException ie) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -3233,7 +3183,7 @@ public class DiskStoreImpl implements DiskStore {
dr.resetRVV();
dr.setRVVTrusted(false);
dr.writeRVV(null, null); // just persist the empty rvv with trust=false
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
// ignore a RegionDestroyedException at this stage
}
if (this.initFile != null && dr.isBackup()) {
@@ -4111,11 +4061,6 @@ public class DiskStoreImpl implements DiskStore {
* Start the backup process. This is the second step of the backup process. In this method, we
* define the data we're backing up by copying the init file and rolling to the next file. After
* this method returns operations can proceed as normal, except that we don't remove oplogs.
- *
- * @param targetDir
- * @param baselineInspector
- * @param restoreScript
- * @throws IOException
*/
public void startBackup(File targetDir, BackupInspector baselineInspector,
RestoreScript restoreScript) throws IOException {
@@ -4130,7 +4075,7 @@ public class DiskStoreImpl implements DiskStore {
}
// Get an appropriate lock object for each set of oplogs.
- Object childLock = childOplog.lock;;
+ Object childLock = childOplog.lock;
// TODO - We really should move this lock into the disk store, but
// until then we need to do this magic to make sure we're actually
@@ -4201,9 +4146,6 @@ public class DiskStoreImpl implements DiskStore {
/**
* Copy the oplogs to the backup directory. This is the final step of the backup process. The
* oplogs we copy are defined in the startBackup method.
- *
- * @param backupManager
- * @throws IOException
*/
public void finishBackup(BackupManager backupManager) throws IOException {
if (diskStoreBackup == null) {
@@ -4312,17 +4254,17 @@ public class DiskStoreImpl implements DiskStore {
props.setProperty(CACHE_XML_FILE, "");
DistributedSystem ds = DistributedSystem.connect(props);
offlineDS = ds;
- Cache c = org.apache.geode.cache.CacheFactory.create(ds);
- offlineCache = c;
- org.apache.geode.cache.DiskStoreFactory dsf = c.createDiskStoreFactory();
+ InternalCache cache = (InternalCache) CacheFactory.create(ds);
+ offlineCache = cache;
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dsDirs);
if (offlineCompacting && maxOplogSize != -1L) {
dsf.setMaxOplogSize(maxOplogSize);
}
- DiskStoreImpl dsi = new DiskStoreImpl(c, dsName,
+ DiskStoreImpl dsi = new DiskStoreImpl(cache, dsName,
((DiskStoreFactoryImpl) dsf).getDiskStoreAttributes(), false, null, true,
upgradeVersionOnly, offlineValidate, offlineCompacting, needsOplogs, offlineModify);
- ((GemFireCacheImpl) c).addDiskStore(dsi);
+ cache.addDiskStore(dsi);
return dsi;
}
@@ -4536,7 +4478,7 @@ public class DiskStoreImpl implements DiskStore {
while (!isClosing() && currentAsyncValueRecoveryMap.containsKey(diskRegion.getId())) {
try {
currentAsyncValueRecoveryMap.wait();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
}
}
@@ -4591,9 +4533,9 @@ public class DiskStoreImpl implements DiskStore {
if (lastWriteTask != null) {
try {
lastWriteTask.get();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
- } catch (Exception e) {
+ } catch (Exception ignore) {
// do nothing, an exception from the write task was already logged.
}
}
@@ -4684,7 +4626,7 @@ public class DiskStoreImpl implements DiskStore {
delayedWritePool.shutdown();
try {
delayedWritePool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
index 551f733..ac72361 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java
@@ -14,6 +14,19 @@
*/
package org.apache.geode.internal.cache;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -22,25 +35,16 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.*;
public class DiskStoreMonitor {
private static final Logger logger = LogService.getLogger();
private static final boolean DISABLE_MONITOR =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_DISABLE_MONITORING");
- // private static final boolean AUTO_RECONNECT =
- // Boolean.getBoolean("gemfire.DISK_USAGE_ENABLE_AUTO_RECONNECT");
private static final int USAGE_CHECK_INTERVAL = Integer
.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_POLLING_INTERVAL_MILLIS", 10000);
+
private static final float LOG_WARNING_THRESHOLD_PCT =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_LOG_WARNING_PERCENT", 99);
@@ -67,7 +71,7 @@ public class DiskStoreMonitor {
if (val < 0 || val > 100) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_WARNING_INVALID_0
- .toLocalizedString(Float.valueOf(val)));
+ .toLocalizedString(val));
}
}
@@ -80,17 +84,15 @@ public class DiskStoreMonitor {
if (val < 0 || val > 100) {
throw new IllegalArgumentException(
LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_CRITICAL_INVALID_0
- .toLocalizedString(Float.valueOf(val)));
+ .toLocalizedString(val));
}
}
private final ScheduledExecutorService exec;
private final Map<DiskStoreImpl, Set<DirectoryHolderUsage>> disks;
- private final LogUsage logDisk;
- // // this is set when we go into auto_reconnect mode
- // private volatile DirectoryHolderUsage criticalDisk;
+ private final LogUsage logDisk;
volatile DiskStateAction _testAction;
@@ -209,9 +211,9 @@ public class DiskStoreMonitor {
private File getLogDir() {
File log = null;
- GemFireCacheImpl gci = GemFireCacheImpl.getInstance();
- if (gci != null) {
- InternalDistributedSystem ds = gci.getInternalDistributedSystem();
+ InternalCache internalCache = GemFireCacheImpl.getInstance();
+ if (internalCache != null) {
+ InternalDistributedSystem ds = internalCache.getInternalDistributedSystem();
if (ds != null) {
DistributionConfig conf = ds.getConfig();
if (conf != null) {
@@ -230,7 +232,7 @@ public class DiskStoreMonitor {
return log;
}
- abstract class DiskUsage {
+ abstract static class DiskUsage {
private DiskState state;
DiskUsage() {
@@ -305,7 +307,7 @@ public class DiskStoreMonitor {
protected abstract void handleStateChange(DiskState next, String pct);
}
- class LogUsage extends DiskUsage {
+ static class LogUsage extends DiskUsage {
private final File dir;
public LogUsage(File dir) {
@@ -382,41 +384,12 @@ public class DiskStoreMonitor {
logger.error(LogMarker.DISK_STORE_MONITOR,
LocalizedMessage.create(LocalizedStrings.DiskStoreMonitor_DISK_CRITICAL, args));
- try {
- // // prepare for restart
- // if (AUTO_RECONNECT) {
- // disk.getCache().saveCacheXmlForReconnect();
- // criticalDisk = this;
- // }
- } finally {
- // pull the plug
- disk.handleDiskAccessException(new DiskAccessException(msg, disk));
- }
+ // TODO: this is weird...
+ disk.handleDiskAccessException(new DiskAccessException(msg, disk));
break;
}
}
- // private void performReconnect(String msg) {
- // try {
- // // don't try to reconnect before the cache is closed
- // disk._testHandleDiskAccessException.await();
- //
- // // now reconnect, clear out the var first so a close can interrupt the
- // // reconnect
- // criticalDisk = null;
- // boolean restart = disk.getCache().getDistributedSystem().tryReconnect(true, msg,
- // disk.getCache());
- // if (LogMarker.DISK_STORE_MONITOR || logger.isDebugEnabled()) {
- // String pre = restart ? "Successfully" : "Unsuccessfully";
- // logger.info(LocalizedStrings.DEBUG, pre + " attempted to restart cache");
- // }
- // } catch (InterruptedException e) {
- // Thread.currentThread().interrupt();
- // } finally {
- // close();
- // }
- // }
-
@Override
protected File dir() {
return dir.getDir();
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index 36ad9ce..e22e1d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -48,12 +48,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
-/**
- *
- */
public class DistTXCommitMessage extends TXMessage {
private static final Logger logger = LogService.getLogger();
+
protected ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null;
/** for deserialization */
@@ -75,7 +73,7 @@ public class DistTXCommitMessage extends TXMessage {
logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
TXManagerImpl txMgr = cache.getTXMgr();
final TXStateProxy txStateProxy = txMgr.getTXState();
TXCommitMessage cmsg = null;
@@ -256,7 +254,7 @@ public class DistTXCommitMessage extends TXMessage {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=").append(this.processorId)
.append(" reply to sender ").append(this.getSender());
return sb.toString();
@@ -339,7 +337,7 @@ public class DistTXCommitMessage extends TXMessage {
(DistTxCommitExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -349,7 +347,7 @@ public class DistTXCommitMessage extends TXMessage {
(DistTxCommitExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -387,14 +385,12 @@ public class DistTXCommitMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
- * @param msgMap
*/
public void handlePotentialCommitFailure(
HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
if (fatalExceptions.size() > 0) {
- StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+ .append(id).append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -428,16 +424,13 @@ public class DistTXCommitMessage extends TXMessage {
public Set getRegionDestroyedMembers(String regionFullPath) {
Set members = (Set) this.regionExceptions.get(regionFullPath);
if (members == null) {
- members = Collections.EMPTY_SET;
+ members = Collections.emptySet();
}
return members;
}
/**
* Protected by (this)
- *
- * @param member
- * @param exceptions
*/
public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
index ffbc3ba..0ab2cc3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
@@ -54,7 +54,7 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
- *
+ *
*/
public final class DistTXPrecommitMessage extends TXMessage {
@@ -107,7 +107,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/*
* Perform precommit
- *
+ *
* [DISTTX] Handle different exceptions here
*/
txMgr.precommit();
@@ -202,7 +202,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
* Return the value from the get operation, serialize it bytes as late as possible to avoid
* making un-neccesary byte[] copies. De-serialize those same bytes as late as possible to avoid
* using precious threads (aka P2P readers).
- *
+ *
* @param recipient the origin VM that performed the get
* @param processorId the processor on which the origin thread is waiting
* @param val the raw value that will eventually be serialized
@@ -218,7 +218,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Processes this message. This method is invoked by the receiver of the message.
- *
+ *
* @param dm the distribution manager that is processing the message.
*/
@Override
@@ -272,9 +272,9 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Reply processor which collects all CommitReplyExceptions for Dist Tx and emits a detailed
* failure exception if problems occur
- *
+ *
* @see TXCommitMessage.CommitReplyProcessor
- *
+ *
* [DISTTX] TODO see if need ReliableReplyProcessor21? departed members?
*/
public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 {
@@ -361,7 +361,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* An Exception that collects many remote CommitExceptions
- *
+ *
* @see TXCommitMessage.CommitExceptionCollectingException
*/
public static class DistTxPrecommitExceptionCollectingException extends ReplyException {
@@ -388,7 +388,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
+ *
* @param msgMap
*/
public void handlePotentialCommitFailure(
@@ -436,7 +436,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
/**
* Protected by (this)
- *
+ *
* @param member
* @param exceptions
*/
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
index bfe302a..d4f5943 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java
@@ -75,7 +75,7 @@ public final class DistTXRollbackMessage extends TXMessage {
logger.debug("Dist TX: Rollback: {}", txId);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
TXManagerImpl txMgr = cache.getTXMgr();
final TXStateProxy txState = txMgr.getTXState();
boolean rollbackSuccessful = false;
@@ -87,10 +87,6 @@ public final class DistTXRollbackMessage extends TXMessage {
"DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}",
txId);
}
- // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId);
- // if (txMgr.isExceptionToken(cmsg)) {
- // throw txMgr.getExceptionForToken(cmsg, txId);
- // }
} else if (txState != null) {
// [DISTTX] TODO - Handle scenarios of no txState
// if no TXState was created (e.g. due to only getEntry/size operations
@@ -219,7 +215,7 @@ public final class DistTXRollbackMessage extends TXMessage {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("DistTXRollbackReplyMessage ").append("processorid=").append(this.processorId)
.append(" reply to sender ").append(this.getSender());
return sb.toString();
@@ -232,7 +228,6 @@ public final class DistTXRollbackMessage extends TXMessage {
/**
* A processor to capture the value returned by {@link DistTXRollbackReplyMessage}
- *
*/
public static class DistTXRollbackResponse extends RemoteOperationResponse {
private volatile Boolean rollbackState;
@@ -275,9 +270,6 @@ public final class DistTXRollbackMessage extends TXMessage {
final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing";
logger.debug(msg, e);
throw e;
- } catch (TransactionDataNotColocatedException e) {
- // Throw this up to user!
- throw e;
}
return rollbackState;
}
@@ -354,7 +346,7 @@ public final class DistTXRollbackMessage extends TXMessage {
(DistTxRollbackExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -364,7 +356,7 @@ public final class DistTXRollbackMessage extends TXMessage {
(DistTxRollbackExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -402,14 +394,12 @@ public final class DistTXRollbackMessage extends TXMessage {
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
- * @param msgMap
*/
public void handlePotentialCommitFailure(
HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
if (fatalExceptions.size() > 0) {
- StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+ .append(id).append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -443,16 +433,13 @@ public final class DistTXRollbackMessage extends TXMessage {
public Set getRegionDestroyedMembers(String regionFullPath) {
Set members = (Set) this.regionExceptions.get(regionFullPath);
if (members == null) {
- members = Collections.EMPTY_SET;
+ members = Collections.emptySet();
}
return members;
}
/**
* Protected by (this)
- *
- * @param member
- * @param exceptions
*/
public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 172fabe..aa40508 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
- *
+ *
* [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
* all
*/
@@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -566,7 +566,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* Handle response of precommit reply
- *
+ *
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
@@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -756,7 +756,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postPutAll(putallOp, successfulPuts, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
@@ -835,7 +835,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postRemoveAll(op, successfulOps, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postRemoveAll "
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.util.DelayedAction;
-/**
- *
- */
public abstract class DistributedCacheOperation {
private static final Logger logger = LogService.getLogger();
public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
public static Random LOSS_SIMULATION_GENERATOR;
public static long SLOW_DISTRIBUTION_MS = 0; // test hook
// constants used in subclasses and distribution messages
// should use enum in source level 1.5+
+
/**
* Deserialization policy: do not deserialize (for byte array, null or cases where the value
* should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
}
- public final static byte DESERIALIZATION_POLICY_NUMBITS =
+ public static final byte DESERIALIZATION_POLICY_NUMBITS =
DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
public static final short DESERIALIZATION_POLICY_END =
(short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
try {
_distribute();
} catch (InvalidVersionException e) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
}
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
DistributedRegion region = getRegion();
if (viewVersion != -1) {
region.getDistributionAdvisor().endOperation(viewVersion);
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
viewVersion);
}
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
if (SLOW_DISTRIBUTION_MS > 0) { // test hook
try {
Thread.sleep(SLOW_DISTRIBUTION_MS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
}
// some members requiring old value are also in the cache op recipients set
- Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+ Set needsOldValueInCacheOp = Collections.emptySet();
// set client routing information into the event
boolean routingComputed = false;
FilterRoutingInfo filterRouting = null;
// recipients that will get a cacheop msg and also a PR message
- Set twoMessages = Collections.EMPTY_SET;
+ Set twoMessages = Collections.emptySet();
if (region.isUsedForPartitionedRegionBucket()) {
- twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+ twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
routingComputed = true;
filterRouting = getRecipientFilterRouting(recipients);
if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
// some members need PR notification of the change for client/wan
// notification
- Set adjunctRecipients = Collections.EMPTY_SET;
+ Set adjunctRecipients = Collections.emptySet();
// Partitioned region listener notification messages piggyback on this
// operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
recipients.removeAll(needsOldValueInCacheOp);
}
- Set cachelessNodes = Collections.EMPTY_SET;
- Set adviseCacheServers = Collections.EMPTY_SET;
- Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
- new HashSet<InternalDistributedMember>();
+ Set cachelessNodes = Collections.emptySet();
+ Set adviseCacheServers;
+ Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
if (!cachelessNodes.isEmpty()) {
List list = new ArrayList(cachelessNodes);
for (Object member : cachelessNodes) {
- if (!recipients.contains(member)) {
+ if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
// Don't include those originally excluded.
list.remove(member);
- } else if (adjunctRecipients.contains(member)) {
- list.remove(member);
}
}
cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
if (!reliableOp || region.isNoDistributionOk()) {
// nothing needs be done in this case
} else {
- region.handleReliableDistribution(Collections.EMPTY_SET);
+ region.handleReliableDistribution(Collections.emptySet());
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
} else {
boolean directAck = false;
boolean useMulticast = region.getMulticastEnabled()
- && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+ && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
boolean shouldAck = shouldAck();
if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
recipients);
}
waitForMembers.removeAll(recipients);
- recipients = Collections.EMPTY_SET;
+ recipients = Collections.emptySet();
}
}
if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
}
adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
- adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+ adviseCacheServers = ((Bucket) region).getPartitionedRegion()
.getCacheDistributionAdvisor().adviseCacheServers();
adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
}
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
}
}
-
/**
* Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
* key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
continue;
}
- CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+ CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
.getCacheDistributionAdvisor().getProfile(m);
if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
continue;
}
-
for (Object value : cf.filterProfile.getCqMap().values()) {
ServerCQ cq = (ServerCQ) value;
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
Long cqID = e.getKey();
// For the CQs satisfying the event with destroy CQEvent, remove
// the entry form CQ cache.
- if (cq.getFilterID() == cqID
- && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
- cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+ if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+ cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
}
}
}
}
}
-
/**
* Get the adjunct receivers for a partitioned region operation
*
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
/**
* perform any operation-specific initialization on the given reply processor
- *
- * @param p
- * @param msg
*/
protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
// nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
}
}
- /**
- * @param closedMembers
- */
private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
return null;
}
CacheDistributionAdvisor advisor;
- // if (region.isUsedForPartitionedRegionBucket()) {
- advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
- // } else {
- // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
- // }
+ advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
}
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
-
private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
return this.op;
}
-
/** sets the concurrency versioning tag for this message */
public void setVersionTag(VersionTag tag) {
this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
/**
* process a reply
*
- * @param reply
- * @param processor
* @return true if the reply-processor should continue to process this response
*/
boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
* @param event the entry event that contains the old value
*/
public void appendOldValueToMessage(EntryEventImpl event) {
- {
- @Unretained
- Object val = event.getRawOldValue();
- if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
- || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
- return;
- }
+ @Unretained
+ Object val = event.getRawOldValue();
+ if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+ || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+ return;
}
event.exportOldValue(this);
}
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
Assert.assertTrue(this.regionPath != null, "regionPath was null");
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
return gfc.getRegionByPathForProcessing(this.regionPath);
}
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
sendReply = false;
basicProcess(dm, lclRgn);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
// region
if (!rgn.isEventTrackerInitialized()
&& (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
}
return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
sendReply = operateOnRegion(event, dm) && sendReply;
} finally {
if (event instanceof EntryEventImpl) {
- ((EntryEventImpl) event).release();
+ ((Releasable) event).release();
}
}
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Region destroyed: nothing to do", this);
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
if (!lclRgn.isDestroyed()) {
logger.error("Got disk access exception, expected region to be destroyed", e);
}
- } catch (EntryNotFoundException e) {
+ } catch (EntryNotFoundException ignore) {
this.appliedOperation = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
// distributed-no-ack message. Don't respond
} else {
- ReplyException exception = rex;
- ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+ ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
isInternal());
}
}
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
* When an event is discarded because of an attempt to overwrite a more recent change we still
* need to deliver that event to clients. Clients can then perform their own concurrency checks
* on the event.
- *
- * @param rgn
- * @param ev
*/
protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
rgn.notifyBridgeClients(ev);
}
- // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
- // String path) {
- // return LocalRegion.getRegionFromPath(sys, path);
- // }
-
protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException;
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- // super.fromData(in);
short bits = in.readShort();
short extBits = in.readShort();
this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
@Override
public void toData(DataOutput out) throws IOException {
- // super.toData(out);
-
short bits = 0;
short extendedBits = 0;
bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
static class CacheOperationReplyProcessor extends DirectReplyProcessor {
public CacheOperationMessage msg;
- public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
- new CopyOnWriteHashSet<InternalDistributedMember>();
+ public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
super(system, initMembers);
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
index dacf8f5..9d85008 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegionFunctionStreamingMessage.java
@@ -106,7 +106,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
if (this.txUniqId == TXManagerImpl.NOTX) {
return null;
} else {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return null;
@@ -116,9 +116,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
}
}
- private void cleanupTransasction(TXStateProxy tx) {
+ private void cleanupTransaction(TXStateProxy tx) {
if (this.txUniqId != TXManagerImpl.NOTX) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return;
@@ -130,7 +130,6 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
@Override
protected void process(final DistributionManager dm) {
-
Throwable thr = null;
boolean sendReply = true;
DistributedRegion dr = null;
@@ -202,7 +201,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
logger.trace(LogMarker.DM, "Exception caught while processing message", t);
}
} finally {
- cleanupTransasction(tx);
+ cleanupTransaction(tx);
if (sendReply && this.processorId != 0) {
ReplyException rex = null;
if (thr != null) {
@@ -275,9 +274,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- return (cache == null || cache.getCancelCriterion().isCancelInProgress());
+ private boolean checkCacheClosing(DistributionManager dm) {
+ InternalCache cache = GemFireCacheImpl.getInstance();
+ return cache == null || cache.getCancelCriterion().isCancelInProgress();
}
/**
@@ -285,7 +284,7 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
*
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ private boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
index 130e2a8..81bb7fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DynamicRegionFactoryImpl.java
@@ -28,15 +28,17 @@ public class DynamicRegionFactoryImpl extends DynamicRegionFactory {
* create an instance of the factory. This is normally only done by DynamicRegionFactory's static
* initialization
*/
- public DynamicRegionFactoryImpl() {}
+ public DynamicRegionFactoryImpl() {
+ // nothing
+ }
/** close the factory. Only do this if you're closing the cache, too */
public void close() {
- _close();
+ doClose();
}
/** initialize the factory for use with a new cache */
- public void internalInit(GemFireCacheImpl c) throws CacheException {
- _internalInit(c);
+ void internalInit(InternalCache cache) throws CacheException {
+ doInternalInit(cache);
}
}