You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/11 21:23:30 UTC
[24/50] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/153db276/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 7d0e7bb..de9e6cb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -12,34 +12,141 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
+import static org.apache.geode.internal.lang.SystemUtils.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CacheStatistics;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CustomExpiry;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.EntryExistsException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestRegistrationEvent;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.LowMemoryException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.cache.PartitionedRegionDistributionException;
+import org.apache.geode.cache.PartitionedRegionStorageException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import org.apache.geode.cache.client.internal.*;
-import org.apache.geode.cache.execute.*;
+import org.apache.geode.cache.client.internal.ClientMetadataService;
+import org.apache.geode.cache.execute.EmptyRegionFunctionException;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.persistence.PersistentID;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.*;
-import org.apache.geode.cache.query.internal.index.*;
+import org.apache.geode.cache.query.FunctionDomainException;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.IndexCreationException;
+import org.apache.geode.cache.query.IndexExistsException;
+import org.apache.geode.cache.query.IndexInvalidException;
+import org.apache.geode.cache.query.IndexNameConflictException;
+import org.apache.geode.cache.query.IndexType;
+import org.apache.geode.cache.query.MultiIndexCreationException;
+import org.apache.geode.cache.query.NameResolutionException;
+import org.apache.geode.cache.query.QueryException;
+import org.apache.geode.cache.query.QueryInvocationTargetException;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.TypeMismatchException;
+import org.apache.geode.cache.query.internal.Bag;
+import org.apache.geode.cache.query.internal.CompiledSelect;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.QCompiler;
+import org.apache.geode.cache.query.internal.QueryExecutor;
+import org.apache.geode.cache.query.internal.ResultsBag;
+import org.apache.geode.cache.query.internal.ResultsCollectionWrapper;
+import org.apache.geode.cache.query.internal.ResultsSet;
+import org.apache.geode.cache.query.internal.index.AbstractIndex;
+import org.apache.geode.cache.query.internal.index.IndexCreationData;
+import org.apache.geode.cache.query.internal.index.IndexManager;
+import org.apache.geode.cache.query.internal.index.IndexUtils;
+import org.apache.geode.cache.query.internal.index.PartitionedIndex;
import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.LockServiceDestroyedException;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionAdvisee;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ProfileListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRemoteToken;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -58,27 +165,64 @@ import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.control.MemoryThresholds;
-import org.apache.geode.internal.cache.execute.*;
+import org.apache.geode.internal.cache.execute.AbstractExecution;
+import org.apache.geode.internal.cache.execute.FunctionExecutionNodePruner;
+import org.apache.geode.internal.cache.execute.FunctionRemoteContext;
+import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
+import org.apache.geode.internal.cache.execute.LocalResultCollector;
+import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionExecutor;
+import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSender;
+import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWaiter;
+import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.lru.HeapEvictor;
import org.apache.geode.internal.cache.lru.LRUStatistics;
-import org.apache.geode.internal.cache.partitioned.*;
+import org.apache.geode.internal.cache.lru.Sizeable;
+import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
+import org.apache.geode.internal.cache.partitioned.DestroyMessage;
import org.apache.geode.internal.cache.partitioned.DestroyMessage.DestroyResponse;
+import org.apache.geode.internal.cache.partitioned.DestroyRegionOnDataStoreMessage;
+import org.apache.geode.internal.cache.partitioned.DumpAllPRConfigMessage;
+import org.apache.geode.internal.cache.partitioned.DumpB2NRegion;
import org.apache.geode.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse;
+import org.apache.geode.internal.cache.partitioned.DumpBucketsMessage;
+import org.apache.geode.internal.cache.partitioned.FetchBulkEntriesMessage;
import org.apache.geode.internal.cache.partitioned.FetchBulkEntriesMessage.FetchBulkEntriesResponse;
+import org.apache.geode.internal.cache.partitioned.FetchEntriesMessage;
import org.apache.geode.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse;
+import org.apache.geode.internal.cache.partitioned.FetchEntryMessage;
import org.apache.geode.internal.cache.partitioned.FetchEntryMessage.FetchEntryResponse;
+import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
import org.apache.geode.internal.cache.partitioned.FetchKeysMessage.FetchKeysResponse;
+import org.apache.geode.internal.cache.partitioned.GetMessage;
import org.apache.geode.internal.cache.partitioned.GetMessage.GetResponse;
+import org.apache.geode.internal.cache.partitioned.IdentityRequestMessage;
import org.apache.geode.internal.cache.partitioned.IdentityRequestMessage.IdentityResponse;
+import org.apache.geode.internal.cache.partitioned.IdentityUpdateMessage;
import org.apache.geode.internal.cache.partitioned.IdentityUpdateMessage.IdentityUpdateResponse;
+import org.apache.geode.internal.cache.partitioned.IndexCreationMsg;
+import org.apache.geode.internal.cache.partitioned.InterestEventMessage;
import org.apache.geode.internal.cache.partitioned.InterestEventMessage.InterestEventResponse;
+import org.apache.geode.internal.cache.partitioned.InvalidateMessage;
import org.apache.geode.internal.cache.partitioned.InvalidateMessage.InvalidateResponse;
+import org.apache.geode.internal.cache.partitioned.PREntriesIterator;
+import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
+import org.apache.geode.internal.cache.partitioned.PRSanityCheckMessage;
+import org.apache.geode.internal.cache.partitioned.PRUpdateEntryVersionMessage;
import org.apache.geode.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse;
import org.apache.geode.internal.cache.partitioned.PartitionMessage.PartitionResponse;
+import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserver;
+import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder;
+import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
+import org.apache.geode.internal.cache.partitioned.PutMessage;
import org.apache.geode.internal.cache.partitioned.PutMessage.PutResult;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
+import org.apache.geode.internal.cache.partitioned.RemoveAllPRMessage;
+import org.apache.geode.internal.cache.partitioned.RemoveIndexesMessage;
+import org.apache.geode.internal.cache.partitioned.SizeMessage;
import org.apache.geode.internal.cache.partitioned.SizeMessage.SizeResponse;
import org.apache.geode.internal.cache.persistence.PRPersistentConfig;
import org.apache.geode.internal.cache.tier.InterestType;
@@ -107,30 +251,18 @@ import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.RegionLogger;
import org.apache.geode.internal.util.TransformUtils;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
/**
* A Region whose total storage is split into chunks of data (partitions) which are copied up to a
* configurable level (for high availability) and placed on multiple VMs for improved performance
* and increased storage capacity.
- *
*/
public class PartitionedRegion extends LocalRegion
implements CacheDistributionAdvisee, QueryExecutor {
- public static final Random rand =
+ public static final Random RANDOM =
new Random(Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "PartitionedRegionRandomSeed",
- NanoTimer.getTime()).longValue());
+ NanoTimer.getTime()));
private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger();
@@ -143,7 +275,7 @@ public class PartitionedRegion extends LocalRegion
* getNetworkHopType byte indicating this was not the bucket owner and a message had to be sent to
* a primary in the same server group
*/
- public static final int NETWORK_HOP_TO_SAME_GROUP = 1;
+ private static final int NETWORK_HOP_TO_SAME_GROUP = 1;
/**
* getNetworkHopType byte indicating this was not the bucket owner and a message had to be sent to
@@ -151,12 +283,12 @@ public class PartitionedRegion extends LocalRegion
*/
public static final int NETWORK_HOP_TO_DIFFERENT_GROUP = 2;
-
private final DiskRegionStats diskRegionStats;
+
/**
* Changes scope of replication to secondary bucket to SCOPE.DISTRIBUTED_NO_ACK
*/
- public static final boolean DISABLE_SECONDARY_BUCKET_ACK =
+ static final boolean DISABLE_SECONDARY_BUCKET_ACK =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disablePartitionedRegionBucketAck");
/**
@@ -170,11 +302,11 @@ public class PartitionedRegion extends LocalRegion
private static ThreadLocal threadRandom = new ThreadLocal() {
@Override
protected Object initialValue() {
- int i = rand.nextInt();
+ int i = RANDOM.nextInt();
if (i < 0) {
i = -1 * i;
}
- return Integer.valueOf(i);
+ return i;
}
};
@@ -203,7 +335,7 @@ public class PartitionedRegion extends LocalRegion
private boolean cleanPRRegistration = false;
/** Time to wait for for acquiring distributed lock ownership */
- final static long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter.parseLong(
+ private static final long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter.parseLong(
System.getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY),
PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT);
@@ -279,7 +411,7 @@ public class PartitionedRegion extends LocalRegion
*
* Concurrency: {@link #isLocallyDestroyed} is volatile
*/
- public Thread locallyDestroyingThread;
+ Thread locallyDestroyingThread;
// TODO someone please add a javadoc for this
private volatile boolean hasPartitionedIndex = false;
@@ -319,8 +451,7 @@ public class PartitionedRegion extends LocalRegion
private ScheduledExecutorService bucketSorter;
- private ConcurrentMap<String, Integer[]> partitionsMap =
- new ConcurrentHashMap<String, Integer[]>();
+ private final ConcurrentMap<String, Integer[]> partitionsMap = new ConcurrentHashMap<>();
public ConcurrentMap<String, Integer[]> getPartitionsMap() {
return this.partitionsMap;
@@ -337,34 +468,33 @@ public class PartitionedRegion extends LocalRegion
* Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 = NWHOP tp servers in
* other server-grp
*/
- private final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() {
+ private static final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() {
@Override
protected Byte initialValue() {
- return Byte.valueOf((byte) NETWORK_HOP_NONE);
+ return (byte) NETWORK_HOP_NONE;
}
};
public void clearNetworkHopData() {
- this.networkHopType.remove();
- this.metadataVersion.remove();
+ networkHopType.remove();
+ metadataVersion.remove();
}
private void setNetworkHopType(Byte value) {
- this.networkHopType.set(value);
+ networkHopType.set(value);
}
/**
- * <p>
* If the last operation in the current thread required a one-hop to another server who held the
* primary bucket for the operation then this will return something other than NETWORK_HOP_NONE.
- * </p>
+ * <p>
* see NETWORK_HOP_NONE, NETWORK_HOP_TO_SAME_GROUP and NETWORK_HOP_TO_DIFFERENT_GROUP
*/
public byte getNetworkHopType() {
- return this.networkHopType.get().byteValue();
+ return networkHopType.get();
}
- private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() {
+ private static final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() {
@Override
protected Byte initialValue() {
return ClientMetadataService.INITIAL_VERSION;
@@ -372,14 +502,13 @@ public class PartitionedRegion extends LocalRegion
};
private void setMetadataVersion(Byte value) {
- this.metadataVersion.set(value);
+ metadataVersion.set(value);
}
public byte getMetadataVersion() {
- return this.metadataVersion.get().byteValue();
+ return metadataVersion.get();
}
-
/**
* Returns the LRUStatistics for this PR. This is needed to find the single instance of
* LRUStatistics created early for a PR when it is recovered from disk. This fixes bug 41938
@@ -392,9 +521,6 @@ public class PartitionedRegion extends LocalRegion
return result;
}
-
- ////////////////// ConcurrentMap methods //////////////////
-
@Override
public boolean remove(Object key, Object value, Object callbackArg) {
final long startTime = PartitionedRegionStats.startTime();
@@ -405,11 +531,6 @@ public class PartitionedRegion extends LocalRegion
}
}
-
-
- ////////////////// End of ConcurrentMap methods //////////////////
-
-
public PartitionListener[] getPartitionListeners() {
return this.partitionListeners;
}
@@ -471,11 +592,11 @@ public class PartitionedRegion extends LocalRegion
public Object getRegion(Object key) throws PRLocallyDestroyedException {
if (cleared) {
- Cache c = GemFireCacheImpl.getInstance();
- if (c == null) {
+ Cache cache = GemFireCacheImpl.getInstance();
+ if (cache == null) {
throw new CacheClosedException();
} else {
- c.getCancelCriterion().checkCancelInProgress(null);
+ cache.getCancelCriterion().checkCancelInProgress(null);
}
}
Assert.assertTrue(key instanceof Integer);
@@ -527,12 +648,11 @@ public class PartitionedRegion extends LocalRegion
}
Assert.assertTrue(key instanceof Integer);
if (sendIdentityRequestMessage)
- IdentityRequestMessage.setLatestId(((Integer) key).intValue());
+ IdentityRequestMessage.setLatestId((Integer) key);
if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) {
- PartitionedRegionException pre = new PartitionedRegionException(
+ throw new PartitionedRegionException(
LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0
.toLocalizedString(key));
- throw pre;
}
return super.put(key, value);
}
@@ -544,26 +664,24 @@ public class PartitionedRegion extends LocalRegion
}
public synchronized String dump() {
- StringBuffer b = new StringBuffer("prIdToPR Map@");
- b.append(System.identityHashCode(prIdToPR)).append(":\n");
- Map.Entry me;
- for (Iterator i = prIdToPR.entrySet().iterator(); i.hasNext();) {
- me = (Map.Entry) i.next();
- b.append(me.getKey()).append("=>").append(me.getValue());
- if (i.hasNext()) {
- b.append("\n");
+ StringBuilder sb = new StringBuilder("prIdToPR Map@");
+ sb.append(System.identityHashCode(prIdToPR)).append(':').append(getLineSeparator());
+ Map.Entry mapEntry;
+ for (Iterator iterator = prIdToPR.entrySet().iterator(); iterator.hasNext();) {
+ mapEntry = (Map.Entry) iterator.next();
+ sb.append(mapEntry.getKey()).append("=>").append(mapEntry.getValue());
+ if (iterator.hasNext()) {
+ sb.append(getLineSeparator());
}
}
- return b.toString();
+ return sb.toString();
}
}
private int partitionedRegionId = -3;
- // final private Scope userScope;
-
/** Node description */
- final private Node node;
+ private final Node node;
/** Helper Object for redundancy Management of PartitionedRegion */
private final PRHARedundancyProvider redundancyProvider;
@@ -578,15 +696,7 @@ public class PartitionedRegion extends LocalRegion
*/
private final StoppableCountDownLatch initializationLatchAfterBucketIntialization;
- /**
- * Constructor for a PartitionedRegion. This has an accessor (Region API) functionality and
- * contains a datastore for actual storage. An accessor can act as a local cache by having a local
- * storage enabled. A PartitionedRegion can be created by a factory method of RegionFactory.java
- * and also by invoking Cache.createRegion(). (Cache.xml etc to be added)
- *
- */
-
- static public final String RETRY_TIMEOUT_PROPERTY =
+ public static final String RETRY_TIMEOUT_PROPERTY =
DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout";
private final PartitionRegionConfigValidator validator;
@@ -604,16 +714,22 @@ public class PartitionedRegion extends LocalRegion
private AbstractGatewaySender parallelGatewaySender = null;
- public PartitionedRegion(String regionname, RegionAttributes ra, LocalRegion parentRegion,
- GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) {
- super(regionname, ra, parentRegion, cache, internalRegionArgs);
+ /**
+ * Constructor for a PartitionedRegion. This has an accessor (Region API) functionality and
+ * contains a datastore for actual storage. An accessor can act as a local cache by having a local
+ * storage enabled. A PartitionedRegion can be created by a factory method of RegionFactory.java
+ * and also by invoking Cache.createRegion(). (Cache.xml etc to be added)
+ */
+ public PartitionedRegion(String regionName, RegionAttributes regionAttributes,
+ LocalRegion parentRegion, InternalCache cache, InternalRegionArguments internalRegionArgs) {
+ super(regionName, regionAttributes, parentRegion, cache, internalRegionArgs);
this.node = initializeNode();
this.prStats = new PartitionedRegionStats(cache.getDistributedSystem(), getFullPath());
this.regionIdentifier = getFullPath().replace('/', '#');
if (logger.isDebugEnabled()) {
- logger.debug("Constructing Partitioned Region {}", regionname);
+ logger.debug("Constructing Partitioned Region {}", regionName);
}
// By adding this disconnect listener we ensure that the pridmap is cleaned
@@ -622,40 +738,37 @@ public class PartitionedRegion extends LocalRegion
// (which prevents pridmap cleanup).
cache.getInternalDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
- // this.userScope = ra.getScope();
- this.partitionAttributes = ra.getPartitionAttributes();
+ this.partitionAttributes = regionAttributes.getPartitionAttributes();
this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
this.retryTimeout = Integer.getInteger(RETRY_TIMEOUT_PROPERTY,
- PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION).intValue();
+ PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION);
this.totalNumberOfBuckets = this.partitionAttributes.getTotalNumBuckets();
this.prStats.incTotalNumBuckets(this.totalNumberOfBuckets);
- this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); // Warning: potential early escape
- // of instance
- this.redundancyProvider = new PRHARedundancyProvider(this); // Warning:
- // potential
- // early escape
- // instance
+
+ // Warning: potential early escape of instance
+ this.distAdvisor = RegionAdvisor.createRegionAdvisor(this);
+ // Warning: potential early escape of instance
+ this.redundancyProvider = new PRHARedundancyProvider(this);
// localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled();
// This is to make sure that local-cache get and put works properly.
// getScope is overridden to return the correct scope.
// this.scope = Scope.LOCAL;
- this.redundantCopies = ra.getPartitionAttributes().getRedundantCopies();
- this.prStats.setConfiguredRedundantCopies(ra.getPartitionAttributes().getRedundantCopies());
- this.prStats.setLocalMaxMemory(ra.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024);
+ this.redundantCopies = regionAttributes.getPartitionAttributes().getRedundantCopies();
+ this.prStats.setConfiguredRedundantCopies(
+ regionAttributes.getPartitionAttributes().getRedundantCopies());
+ this.prStats.setLocalMaxMemory(
+ regionAttributes.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024);
// No redundancy required for writes
- this.minimumWriteRedundancy =
- Integer
- .getInteger(
- DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionWriteRedundancy", 0)
- .intValue();
+ this.minimumWriteRedundancy = Integer.getInteger(
+ DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionWriteRedundancy", 0);
+
// No redundancy required for reads
- this.minimumReadRedundancy = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionReadRedundancy", 0)
- .intValue();
+ this.minimumReadRedundancy = Integer.getInteger(
+ DistributionConfig.GEMFIRE_PREFIX + "mimimumPartitionedRegionReadRedundancy", 0);
- this.haveCacheLoader = ra.getCacheLoader() != null;
+ this.haveCacheLoader = regionAttributes.getCacheLoader() != null;
this.initializationLatchAfterBucketIntialization =
new StoppableCountDownLatch(this.getCancelCriterion(), 1);
@@ -680,7 +793,7 @@ public class PartitionedRegion extends LocalRegion
}
if (logger.isDebugEnabled()) {
- logger.debug("Partitioned Region {} constructed {}", regionname,
+ logger.debug("Partitioned Region {} constructed {}", regionName,
(this.haveCacheLoader ? "with a cache loader" : ""));
}
if (this.getEvictionAttributes() != null
@@ -757,7 +870,7 @@ public class PartitionedRegion extends LocalRegion
});
}
- public final boolean isShadowPR() {
+ public boolean isShadowPR() {
return isShadowPR;
}
@@ -768,7 +881,7 @@ public class PartitionedRegion extends LocalRegion
public Set<String> getParallelGatewaySenderIds() {
Set<String> regionGatewaySenderIds = this.getAllGatewaySenderIds();
if (regionGatewaySenderIds.isEmpty()) {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
Set<GatewaySender> cacheGatewaySenders = getCache().getAllGatewaySenders();
Set<String> parallelGatewaySenderIds = new HashSet<String>();
@@ -804,10 +917,9 @@ public class PartitionedRegion extends LocalRegion
if (config.getTotalNumBuckets() != this.getTotalNumberOfBuckets()) {
Object[] prms = new Object[] {this.getFullPath(), this.getTotalNumberOfBuckets(),
config.getTotalNumBuckets()};
- IllegalStateException ise = new IllegalStateException(
+ throw new IllegalStateException(
LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2
.toString(prms));
- throw ise;
}
// Make sure we don't change to be colocated with a different region
// We also can't change from colocated to not colocated without writing
@@ -820,10 +932,9 @@ public class PartitionedRegion extends LocalRegion
.toLocalizedString(this.getFullPath()),
null, dsi);
dsi.handleDiskAccessException(dae);
- IllegalStateException ise = new IllegalStateException(
+ throw new IllegalStateException(
LocalizedStrings.PartitionedRegion_FOR_REGION_0_ColocatedWith_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2
.toString(prms));
- throw ise;
}
} else {
@@ -865,8 +976,6 @@ public class PartitionedRegion extends LocalRegion
createAndValidatePersistentConfig();
initializePartitionedRegion();
- /* set the total number of buckets */
- // setTotalNumOfBuckets();
// If localMaxMemory is set to 0, do not initialize Data Store.
final boolean storesData = this.localMaxMemory > 0;
if (storesData) {
@@ -1020,7 +1129,7 @@ public class PartitionedRegion extends LocalRegion
if (!allGatewaySenderIds.isEmpty()) {
for (GatewaySender sender : cache.getAllGatewaySenders()) {
if (sender.isParallel() && allGatewaySenderIds.contains(sender.getId())) {
- /**
+ /*
* get the ParallelGatewaySender to create the colocated partitioned region for this
* region.
*/
@@ -1204,7 +1313,6 @@ public class PartitionedRegion extends LocalRegion
}
final RegionLock rl = getRegionLock();
try {
- // if (!rl.lock()) {
if (logger.isDebugEnabled()) {
logger.debug("registerPartitionedRegion: obtaining lock");
}
@@ -1223,8 +1331,7 @@ public class PartitionedRegion extends LocalRegion
this.getAllGatewaySenderIds());
logger.info(LocalizedMessage.create(
LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_BORN_WITH_PRID_1_IDENT_2,
- new Object[] {getFullPath(), Integer.valueOf(this.partitionedRegionId),
- getRegionIdentifier()}));
+ new Object[] {getFullPath(), this.partitionedRegionId, getRegionIdentifier()}));
PRSanityCheckMessage.schedule(this);
} else {
@@ -1238,11 +1345,11 @@ public class PartitionedRegion extends LocalRegion
this.partitionedRegionId = prConfig.getPRId();
logger.info(LocalizedMessage.create(
LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_CREATED_WITH_PRID_1,
- new Object[] {getFullPath(), Integer.valueOf(this.partitionedRegionId)}));
+ new Object[] {getFullPath(), this.partitionedRegionId}));
}
synchronized (prIdToPR) {
- prIdToPR.put(Integer.valueOf(this.partitionedRegionId), this); // last
+ prIdToPR.put(this.partitionedRegionId, this); // last
}
prConfig.addNode(this.node);
if (this.getFixedPartitionAttributesImpl() != null) {
@@ -1281,15 +1388,14 @@ public class PartitionedRegion extends LocalRegion
SystemFailure.checkFailure();
String registerErrMsg =
LocalizedStrings.PartitionedRegion_AN_EXCEPTION_WAS_CAUGHT_WHILE_REGISTERING_PARTITIONEDREGION_0_DUMPPRID_1
- .toLocalizedString(new Object[] {getFullPath(), prIdToPR.dump()});
+ .toLocalizedString(getFullPath(), prIdToPR.dump());
try {
synchronized (prIdToPR) {
- if (prIdToPR.containsKey(Integer.valueOf(this.partitionedRegionId))) {
- prIdToPR.put(Integer.valueOf(this.partitionedRegionId), PRIdMap.FAILED_REGISTRATION,
- false);
+ if (prIdToPR.containsKey(this.partitionedRegionId)) {
+ prIdToPR.put(this.partitionedRegionId, PRIdMap.FAILED_REGISTRATION, false);
logger.info(LocalizedMessage.create(
LocalizedStrings.PartitionedRegion_FAILED_REGISTRATION_PRID_0_NAMED_1,
- new Object[] {Integer.valueOf(this.partitionedRegionId), this.getName()}));
+ new Object[] {this.partitionedRegionId, this.getName()}));
}
}
} catch (VirtualMachineError err) {
@@ -1297,7 +1403,7 @@ public class PartitionedRegion extends LocalRegion
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
- } catch (Throwable ignore) {
+ } catch (Throwable e) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
@@ -1305,8 +1411,7 @@ public class PartitionedRegion extends LocalRegion
// is still usable:
SystemFailure.checkFailure();
if (logger.isDebugEnabled()) {
- logger.debug("Partitioned Region creation, could not clean up after caught exception",
- ignore);
+ logger.debug("Partitioned Region creation, could not clean up after caught exception", e);
}
}
throw new PartitionedRegionException(registerErrMsg, t);
@@ -1318,7 +1423,7 @@ public class PartitionedRegion extends LocalRegion
}
} catch (Exception es) {
if (logger.isDebugEnabled()) {
- logger.warn(es.getMessage(), es);
+ logger.debug(es.getMessage(), es);
}
}
}
@@ -1373,7 +1478,7 @@ public class PartitionedRegion extends LocalRegion
/**
* Get the Partitioned Region identifier used for DLocks (Bucket and Region)
*/
- final public String getRegionIdentifier() {
+ public String getRegionIdentifier() {
return this.regionIdentifier;
}
@@ -1384,8 +1489,6 @@ public class PartitionedRegion extends LocalRegion
/**
* Throw an exception if persistent data recovery from disk is not complete for this region.
- *
- * @throws PartitionOfflineException
*/
public void checkPROffline() throws PartitionOfflineException {
if (getDataPolicy().withPersistence() && !recoveredFromDisk) {
@@ -1398,7 +1501,7 @@ public class PartitionedRegion extends LocalRegion
}
}
- public final void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) {
+ public void updatePRConfig(PartitionRegionConfig prConfig, boolean putOnlyIfUpdated) {
final Set<Node> nodes = prConfig.getNodes();
final PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(this);
RegionLock colocatedLock = null;
@@ -1432,11 +1535,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- *
- * @param keyInfo
* @param access true if caller wants last accessed time updated
* @param allowTombstones - whether a tombstone can be returned
- * @return TODO
*/
@Override
protected Region.Entry<?, ?> nonTXGetEntry(KeyInfo keyInfo, boolean access,
@@ -1463,7 +1563,7 @@ public class PartitionedRegion extends LocalRegion
logger.trace("getEntryInBucket: " + "Key key={} ({}) from: {} bucketId={}", key,
key.hashCode(), targetNode, bucketStringForLogs(bucketId));
}
- Integer bucketIdInt = Integer.valueOf(bucketId);
+ Integer bucketIdInt = bucketId;
EntrySnapshot ret = null;
int count = 0;
RetryTimeKeeper retryTime = null;
@@ -1503,10 +1603,10 @@ public class PartitionedRegion extends LocalRegion
return ret;
} catch (PRLocallyDestroyedException pde) {
if (logger.isDebugEnabled()) {
- logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException ");
+ logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException", pde);
}
checkReadiness();
- } catch (EntryNotFoundException enfe) {
+ } catch (EntryNotFoundException ignore) {
return null;
} catch (ForceReattemptException prce) {
prce.checkKey(key);
@@ -1515,7 +1615,7 @@ public class PartitionedRegion extends LocalRegion
}
checkReadiness();
InternalDistributedMember lastNode = retryNode;
- retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+ retryNode = getOrCreateNodeForBucketRead(bucketIdInt);
if (lastNode.equals(retryNode)) {
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
@@ -1530,8 +1630,8 @@ public class PartitionedRegion extends LocalRegion
logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(),
retryNode);
}
- getRegionAdvisor().notPrimary(bucketIdInt.intValue(), retryNode);
- retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+ getRegionAdvisor().notPrimary(bucketIdInt, retryNode);
+ retryNode = getOrCreateNodeForBucketRead(bucketIdInt);
}
// It's possible this is a GemFire thread e.g. ServerConnection
@@ -1553,11 +1653,10 @@ public class PartitionedRegion extends LocalRegion
if (logger.isDebugEnabled()) {
e = new PartitionedRegionDistributionException(
LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS
- .toLocalizedString(Integer.valueOf(count)));
+ .toLocalizedString(count));
}
logger.warn(LocalizedMessage.create(
- LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS,
- Integer.valueOf(count)), e);
+ LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, count), e);
return null;
}
@@ -1581,7 +1680,6 @@ public class PartitionedRegion extends LocalRegion
* @param allowTombstones whether tombstones should be returned
* @throws EntryNotFoundException if the entry doesn't exist
* @throws ForceReattemptException if the peer is no longer available
- * @throws PrimaryBucketException
* @return true if the passed key is contained remotely.
*/
public EntrySnapshot getEntryRemotely(InternalDistributedMember targetNode, Integer bucketId,
@@ -1616,7 +1714,7 @@ public class PartitionedRegion extends LocalRegion
* @throws UnsupportedOperationException OVERRIDES
*/
@Override
- final public Region createSubregion(String subregionName, RegionAttributes regionAttributes)
+ public Region createSubregion(String subregionName, RegionAttributes regionAttributes)
throws RegionExistsException, TimeoutException {
throw new UnsupportedOperationException();
}
@@ -1710,7 +1808,7 @@ public class PartitionedRegion extends LocalRegion
for (;;) {
try {
return doExecuteQuery(query, parameters, buckets);
- } catch (ForceReattemptException fre) {
+ } catch (ForceReattemptException ignore) {
// fall through and loop
}
}
@@ -1736,20 +1834,20 @@ public class PartitionedRegion extends LocalRegion
while (remoteIter.hasNext()) {
allBuckets.add((Integer) remoteIter.next());
}
- } catch (NoSuchElementException stop) {
+ } catch (NoSuchElementException ignore) {
}
} else { // local buckets
Iterator localIter = null;
if (this.dataStore != null) {
localIter = buckets.iterator();
} else {
- localIter = Collections.EMPTY_SET.iterator();
+ localIter = Collections.emptySet().iterator();
}
try {
while (localIter.hasNext()) {
allBuckets.add((Integer) localIter.next());
}
- } catch (NoSuchElementException stop) {
+ } catch (NoSuchElementException ignore) {
}
}
@@ -1782,7 +1880,7 @@ public class PartitionedRegion extends LocalRegion
try {
results = prqe.queryBuckets(null);
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} catch (FunctionDomainException e) {
throw e;
@@ -1806,7 +1904,7 @@ public class PartitionedRegion extends LocalRegion
// Drop Duplicates if this is a DISTINCT query
boolean allowsDuplicates = results.getCollectionType().allowsDuplicates();
- // Asif: No need to apply the limit to the SelectResults.
+ // No need to apply the limit to the SelectResults.
// We know that even if we do not apply the limit,
// the results will satisfy the limit
// as it has been evaluated in the iteration of List to
@@ -1821,16 +1919,14 @@ public class PartitionedRegion extends LocalRegion
if (selectExpr.getOrderByAttrs() != null) {
// Set limit also, its not applied while building the final result set as order by is
// involved.
- // results = new ResultsCollectionWrapper(elementType, results.asSet(),
- // query.getLimit(parameters));
} else if (allowsDuplicates) {
results = new ResultsCollectionWrapper(elementType, results.asSet());
}
if (selectExpr.isCount() && (results.isEmpty() || selectExpr.isDistinct())) {
- SelectResults resultCount = new ResultsBag(getCachePerfStats());// Constructor with
- // elementType not visible.
+ // Constructor with elementType not visible.
+ SelectResults resultCount = new ResultsBag(getCachePerfStats());
resultCount.setElementType(new ObjectTypeImpl(Integer.class));
- ((ResultsBag) resultCount).addAndGetOccurence(results.size());
+ ((Bag) resultCount).addAndGetOccurence(results.size());
return resultCount;
}
}
@@ -1874,11 +1970,6 @@ public class PartitionedRegion extends LocalRegion
throw new UnsupportedOperationException();
}
- // /////////////////////////////////////////////////////////////////////
- // ////////////// Operation Supported for this release
- // //////////////////////////////
- // /////////////////////////////////////////////////////////////////////
-
@Override
boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld, Object expectedOldValue,
boolean requireOldValue, long lastModified, boolean overwriteDestroyed)
@@ -1895,7 +1986,7 @@ public class PartitionedRegion extends LocalRegion
final Integer bucketId = event.getKeyInfo().getBucketId();
assert bucketId != KeyInfo.UNKNOWN_BUCKET;
// check in bucket2Node region
- InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId.intValue(), null);
+ InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId, null);
// force all values to be serialized early to make size computation cheap
// and to optimize distribution.
if (logger.isDebugEnabled()) {
@@ -1905,7 +1996,7 @@ public class PartitionedRegion extends LocalRegion
if (targetNode == null) {
try {
bucketStorageAssigned = false;
- targetNode = createBucket(bucketId.intValue(), event.getNewValSizeForPR(), null);
+ targetNode = createBucket(bucketId, event.getNewValSizeForPR(), null);
} catch (PartitionedRegionStorageException e) {
// try not to throw a PRSE if the cache is closing or this region was
// destroyed during createBucket() (bug 36574)
@@ -1943,22 +2034,9 @@ public class PartitionedRegion extends LocalRegion
}
} catch (RegionDestroyedException rde) {
if (!rde.getRegionFullPath().equals(getFullPath())) {
- RegionDestroyedException rde2 = new RegionDestroyedException(toString(), getFullPath());
- rde2.initCause(rde);
- throw rde2;
- }
- }
- // catch (CacheWriterException cwe) {
- // throw cwe;
- // }
- // catch (TimeoutException te) {
- // throw te;
- // }
- // catch (RuntimeException re) {
- // throw re;
- // }
- finally {
- // event.setPutAllOperation(putAllOp_save); // Gester: temporary fix
+ throw new RegionDestroyedException(toString(), getFullPath(), rde);
+ }
+ } finally {
if (putAllOp_save == null) {
// only for normal put
if (ifNew) {
@@ -1970,8 +2048,8 @@ public class PartitionedRegion extends LocalRegion
}
if (!result) {
checkReadiness();
- if (!ifNew && !ifOld && !this.concurrencyChecksEnabled) { // may fail due to concurrency
- // conflict
+ if (!ifNew && !ifOld && !this.concurrencyChecksEnabled) {
+ // may fail due to concurrency conflict
// failed for unknown reason
// throw new PartitionedRegionStorageException("unable to execute operation");
logger.warn(
@@ -2000,16 +2078,10 @@ public class PartitionedRegion extends LocalRegion
getSharedDataView().destroyExistingEntry(event, true, null);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.LocalRegion#checkIfAboveThreshold(org.apache.geode.internal.
- * cache.EntryEventImpl)
- */
@Override
- public void checkIfAboveThreshold(EntryEventImpl evi) throws LowMemoryException {
- getRegionAdvisor().checkIfBucketSick(evi.getKeyInfo().getBucketId(), evi.getKey());
+ public void checkIfAboveThreshold(EntryEventImpl entryEvent) throws LowMemoryException {
+ getRegionAdvisor().checkIfBucketSick(entryEvent.getKeyInfo().getBucketId(),
+ entryEvent.getKey());
}
public boolean isFixedPartitionedRegion() {
@@ -2044,9 +2116,8 @@ public class PartitionedRegion extends LocalRegion
return 0;
}
-
@Override
- public void postPutAllFireEvents(DistributedPutAllOperation putallOp,
+ public void postPutAllFireEvents(DistributedPutAllOperation putAllOp,
VersionedObjectList successfulPuts) {
/*
* No op on pr, will happen in the buckets etc.
@@ -2054,22 +2125,21 @@ public class PartitionedRegion extends LocalRegion
}
@Override
- public void postRemoveAllFireEvents(DistributedRemoveAllOperation op,
+ public void postRemoveAllFireEvents(DistributedRemoveAllOperation removeAllOp,
VersionedObjectList successfulOps) {
/*
* No op on pr, will happen in the buckets etc.
*/
}
-
/**
* Create PutAllPRMsgs for each bucket, and send them.
*
- * @param putallO DistributedPutAllOperation object.
+ * @param putAllOp DistributedPutAllOperation object.
* @param successfulPuts not used in PartitionedRegion.
*/
@Override
- public long postPutAllSend(DistributedPutAllOperation putallO,
+ public long postPutAllSend(DistributedPutAllOperation putAllOp,
VersionedObjectList successfulPuts) {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -2077,94 +2147,85 @@ public class PartitionedRegion extends LocalRegion
throw new CacheClosedException("Cache is shutting down");
}
- try {
- final long startTime = PartitionedRegionStats.startTime();
- // build all the msgs by bucketid
- HashMap prMsgMap = putallO.createPRMessages();
- PutAllPartialResult partialKeys = new PutAllPartialResult(putallO.putAllDataSize);
-
- // clear the successfulPuts list since we're actually doing the puts here
- // and the basicPutAll work was just a way to build the DPAO object
- Map<Object, VersionTag> keyToVersionMap =
- new HashMap<Object, VersionTag>(successfulPuts.size());
- successfulPuts.clearVersions();
- Iterator itor = prMsgMap.entrySet().iterator();
- while (itor.hasNext()) {
- Map.Entry mapEntry = (Map.Entry) itor.next();
- Integer bucketId = (Integer) mapEntry.getKey();
- PutAllPRMessage prMsg = (PutAllPRMessage) mapEntry.getValue();
- checkReadiness();
- long then = 0;
- if (isDebugEnabled) {
- then = System.currentTimeMillis();
+ final long startTime = PartitionedRegionStats.startTime();
+ // build all the msgs by bucketid
+ HashMap prMsgMap = putAllOp.createPRMessages();
+ PutAllPartialResult partialKeys = new PutAllPartialResult(putAllOp.putAllDataSize);
+
+ // clear the successfulPuts list since we're actually doing the puts here
+ // and the basicPutAll work was just a way to build the DPAO object
+ Map<Object, VersionTag> keyToVersionMap =
+ new HashMap<Object, VersionTag>(successfulPuts.size());
+ successfulPuts.clearVersions();
+ Iterator itor = prMsgMap.entrySet().iterator();
+ while (itor.hasNext()) {
+ Map.Entry mapEntry = (Map.Entry) itor.next();
+ Integer bucketId = (Integer) mapEntry.getKey();
+ PutAllPRMessage prMsg = (PutAllPRMessage) mapEntry.getValue();
+ checkReadiness();
+ long then = 0;
+ if (isDebugEnabled) {
+ then = System.currentTimeMillis();
+ }
+ try {
+ VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg);
+ if (versions.size() > 0) {
+ partialKeys.addKeysAndVersions(versions);
+ versions.saveVersions(keyToVersionMap);
+ } else if (!this.concurrencyChecksEnabled) { // no keys returned if not versioned
+ Set keys = prMsg.getKeys();
+ partialKeys.addKeys(keys);
}
- try {
- VersionedObjectList versions = sendMsgByBucket(bucketId, prMsg);
- if (versions.size() > 0) {
- partialKeys.addKeysAndVersions(versions);
- versions.saveVersions(keyToVersionMap);
- } else if (!this.concurrencyChecksEnabled) { // no keys returned if not versioned
- Set keys = prMsg.getKeys();
- partialKeys.addKeys(keys);
- }
- } catch (PutAllPartialResultException pre) {
- // sendMsgByBucket applied partial keys
- if (isDebugEnabled) {
- logger.debug("PR.postPutAll encountered PutAllPartialResultException, ", pre);
- }
- partialKeys.consolidate(pre.getResult());
- } catch (Exception ex) {
- // If failed at other exception
- if (isDebugEnabled) {
- logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ", ex);
- }
- @Released
- EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
- try {
- partialKeys.saveFailedKey(firstEvent.getKey(), ex);
- } finally {
- firstEvent.release();
- }
+ } catch (PutAllPartialResultException pre) {
+ // sendMsgByBucket applied partial keys
+ if (isDebugEnabled) {
+ logger.debug("PR.postPutAll encountered PutAllPartialResultException, ", pre);
}
+ partialKeys.consolidate(pre.getResult());
+ } catch (Exception ex) {
+ // If failed at other exception
if (isDebugEnabled) {
- long now = System.currentTimeMillis();
- if ((now - then) >= 10000) {
- logger.debug("PR.sendMsgByBucket took " + (now - then) + " ms");
- }
+ logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ", ex);
+ }
+ @Released
+ EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
+ try {
+ partialKeys.saveFailedKey(firstEvent.getKey(), ex);
+ } finally {
+ firstEvent.release();
}
}
- this.prStats.endPutAll(startTime);
- if (!keyToVersionMap.isEmpty()) {
- for (Iterator it = successfulPuts.getKeys().iterator(); it.hasNext();) {
- successfulPuts.addVersion(keyToVersionMap.get(it.next()));
+ if (isDebugEnabled) {
+ long now = System.currentTimeMillis();
+ if ((now - then) >= 10000) {
+ logger.debug("PR.sendMsgByBucket took " + (now - then) + " ms");
}
- keyToVersionMap.clear();
}
+ }
+ this.prStats.endPutAll(startTime);
+ if (!keyToVersionMap.isEmpty()) {
+ for (Iterator it = successfulPuts.getKeys().iterator(); it.hasNext();) {
+ successfulPuts.addVersion(keyToVersionMap.get(it.next()));
+ }
+ keyToVersionMap.clear();
+ }
- if (partialKeys.hasFailure()) {
- logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1,
- new Object[] {getFullPath(), partialKeys}));
- if (putallO.isBridgeOperation()) {
- if (partialKeys.getFailure() instanceof CancelException) {
- throw (CancelException) partialKeys.getFailure();
- } else {
- throw new PutAllPartialResultException(partialKeys);
- }
+ if (partialKeys.hasFailure()) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.Region_PutAll_Applied_PartialKeys_0_1,
+ new Object[] {getFullPath(), partialKeys}));
+ if (putAllOp.isBridgeOperation()) {
+ if (partialKeys.getFailure() instanceof CancelException) {
+ throw (CancelException) partialKeys.getFailure();
} else {
- if (partialKeys.getFailure() instanceof RuntimeException) {
- throw (RuntimeException) partialKeys.getFailure();
- } else {
- throw new RuntimeException(partialKeys.getFailure());
- }
+ throw new PutAllPartialResultException(partialKeys);
+ }
+ } else {
+ if (partialKeys.getFailure() instanceof RuntimeException) {
+ throw (RuntimeException) partialKeys.getFailure();
+ } else {
+ throw new RuntimeException(partialKeys.getFailure());
}
}
- } finally {
- /*
- * // TODO XD OFFHEAP MERGE: do we have any events that need freeOffHeapReferences for
- * (PutAllPRMessage.PutAllResponse resp : responses) { PutAllPRMessage.PRMsgResponseContext
- * ctx = resp.getContextObject(); if (ctx != null) { EntryEventImpl e = ctx.getEvent(); if (e
- * != null) { e.release(); } } }
- */
}
return -1;
}
@@ -2272,7 +2333,7 @@ public class PartitionedRegion extends LocalRegion
EntryEventImpl event = prMsg.getFirstEvent(this);
try {
RetryTimeKeeper retryTime = null;
- InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null);
+ InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
if (isDebugEnabled) {
logger.debug("PR.sendMsgByBucket:bucket {}'s currentTarget is {}", bucketId, currentTarget);
}
@@ -2304,7 +2365,7 @@ public class PartitionedRegion extends LocalRegion
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -2343,9 +2404,9 @@ public class PartitionedRegion extends LocalRegion
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
if (isDebugEnabled) {
- logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying", lastTarget,
+ logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget,
currentTarget);
}
if (lastTarget.equals(currentTarget)) {
@@ -2369,11 +2430,11 @@ public class PartitionedRegion extends LocalRegion
logger.debug("Bucket {} on Node {} not primnary", notPrimary.getLocalizedMessage(),
currentTarget);
}
- getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget);
+ getRegionAdvisor().notPrimary(bucketId, currentTarget);
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
} catch (DataLocationException dle) {
if (isDebugEnabled) {
logger.debug("DataLocationException processing putAll", dle);
@@ -2413,7 +2474,7 @@ public class PartitionedRegion extends LocalRegion
EntryEventImpl event = prMsg.getFirstEvent(this);
try {
RetryTimeKeeper retryTime = null;
- InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null);
+ InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
if (logger.isDebugEnabled()) {
logger.debug("PR.sendMsgByBucket:bucket {}'s currentTarget is {}", bucketId, currentTarget);
}
@@ -2445,7 +2506,7 @@ public class PartitionedRegion extends LocalRegion
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -2484,7 +2545,7 @@ public class PartitionedRegion extends LocalRegion
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
if (logger.isTraceEnabled()) {
logger.trace("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget,
currentTarget);
@@ -2510,11 +2571,11 @@ public class PartitionedRegion extends LocalRegion
logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(),
currentTarget);
}
- getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget);
+ getRegionAdvisor().notPrimary(bucketId, currentTarget);
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
} catch (DataLocationException dle) {
if (logger.isDebugEnabled()) {
logger.debug("DataLocationException processing putAll", dle);
@@ -2658,8 +2719,7 @@ public class PartitionedRegion extends LocalRegion
boolean requireOldValue, final long lastModified) {
if (logger.isDebugEnabled()) {
logger.debug("putInBucket: {} ({}) to {} to bucketId={} retry={} ms", event.getKey(),
- event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId.intValue()),
- retryTimeout);
+ event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId), retryTimeout);
}
// retry the put remotely until it finds the right node managing the bucket
@@ -2692,7 +2752,7 @@ public class PartitionedRegion extends LocalRegion
boolean interrupted = Thread.interrupted();
try {
Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -2726,11 +2786,6 @@ public class PartitionedRegion extends LocalRegion
}
checkIfAboveThreshold(event);
if (isLocal) {
- // final boolean cacheWrite = !event.isOriginRemote()
- // && !event.isNetSearch();
- // if (cacheWrite) {
- // doCacheWriteBeforePut(event, ifNew);
- // }
event.setInvokePRCallbacks(true);
long start = this.prStats.startPutLocal();
try {
@@ -2740,7 +2795,7 @@ public class PartitionedRegion extends LocalRegion
// given that most manipulation of values is remote (requiring serialization to send).
// But... function execution always implies local manipulation of
// values so keeping locally updated values in Object form should be more efficient.
- if (!DistributionManager.isFunctionExecutionThread.get().booleanValue()) {
+ if (!DistributionManager.isFunctionExecutionThread.get()) {
// TODO: this condition may not help since BucketRegion.virtualPut calls
// forceSerialized
br.forceSerialized(event);
@@ -2811,7 +2866,7 @@ public class PartitionedRegion extends LocalRegion
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
if (lastTarget.equals(currentTarget)) {
if (retryTime.overMaximum()) {
PRHARedundancyProvider.timedOut(this, null, null, "update an entry", this.retryTimeout);
@@ -2825,11 +2880,11 @@ public class PartitionedRegion extends LocalRegion
logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(),
currentTarget);
}
- getRegionAdvisor().notPrimary(bucketId.intValue(), currentTarget);
+ getRegionAdvisor().notPrimary(bucketId, currentTarget);
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
}
// It's possible this is a GemFire thread e.g. ServerConnection
@@ -2857,7 +2912,7 @@ public class PartitionedRegion extends LocalRegion
if (logger.isDebugEnabled()) {
logger.debug(
"putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}",
- bucketStringForLogs(bucketId.intValue()), count, (timeOut - System.currentTimeMillis()),
+ bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()),
currentTarget);
}
} // for
@@ -2885,45 +2940,16 @@ public class PartitionedRegion extends LocalRegion
}
retryTime.waitForBucketsRecovery();
- newNode = getNodeForBucketWrite(bucketId.intValue(), retryTime);
+ newNode = getNodeForBucketWrite(bucketId, retryTime);
if (newNode == null) {
- newNode = createBucket(bucketId.intValue(), getEntrySize(event), retryTime);
+ newNode = createBucket(bucketId, getEntrySize(event), retryTime);
}
return newNode;
}
/**
- * Serialize the key and value early (prior to creating the message) to gather the size of the
- * entry Assumes the new value from the <code>EntryEventImpl</code> is not serialized
- *
- * @return sum of bytes as reported by {@link CachedDeserializable#getSizeInBytes()}
- */
- // private int serializeValue(EntryEventImpl event)
- // {
- // TODO serialize the key as well
- // this code used to make the following call:
- // Object val = event.getNewValue();
- // which deserializes the value and we don't want to do that.
- // int numBytes = 0;
- // Object val = event.getNewValue();
- // if (val == null) {
- // // event.setSerializedNewValue(new byte[] {DataSerializer.NULL});
- // return 0;
- // }
- // if (val instanceof byte[]) {
- // byte[] v = (byte[]) val;
- // numBytes = v.length;
- // } else {
- // if (event.getSerializedNewValue() == null) {
- // event.setSerializedNewValue(EntryEventImpl.serialize(event.getNewValue()));
- // }
- // numBytes = getEntrySize(event);
- // }
- // return numBytes;
- // }
- /**
- * Get the serialized size of an <code>EntryEventImpl</code>
+ * Get the serialized size of an {@code EntryEventImpl}
*
* @param eei the entry from whcih to fetch the size
* @return the size of the serialized entry
@@ -2932,28 +2958,11 @@ public class PartitionedRegion extends LocalRegion
@Unretained
final Object v = eei.getRawNewValue();
if (v instanceof CachedDeserializable) {
- return ((CachedDeserializable) v).getSizeInBytes();
+ return ((Sizeable) v).getSizeInBytes();
}
return 0;
}
- // /**
- // * Gets the Node that is managing a specific bucketId. This does consider
- // the
- // * failed nodes.
- // *
- // * @param bucketId
- // * identifier for bucket
- // * @param failedNodeList
- // * of all the failedNodes to avoid these failed nodes to be picked in
- // * the next node selection.
- // * @return the Node managing the bucket
- // */
- // private Node getNodeForBucketExcludeFailedNode(final Long bucketId,
- // final List failedNodeList) {
- // throw new IllegalStateException("bucket2node should not be used");
- // }
-
public InternalDistributedMember getOrCreateNodeForBucketWrite(int bucketId,
final RetryTimeKeeper snoozer) {
InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId, snoozer);
@@ -3015,8 +3024,7 @@ public class PartitionedRegion extends LocalRegion
final TimeoutException noTime = new TimeoutException(
LocalizedStrings.PartitionedRegion_ATTEMPT_TO_ACQUIRE_PRIMARY_NODE_FOR_WRITE_ON_BUCKET_0_TIMED_OUT_IN_1_MS_CURRENT_REDUNDANCY_2_DOES_NOT_SATISFY_MINIMUM_3
.toLocalizedString(new Object[] {bucketStringForLogs(bucketId),
- Integer.valueOf(localSnoozer.getRetryTime()), Integer.valueOf(red),
- Integer.valueOf(this.minimumWriteRedundancy)}));
+ localSnoozer.getRetryTime(), red, this.minimumWriteRedundancy}));
checkReadiness();
throw noTime;
}
@@ -3028,8 +3036,6 @@ public class PartitionedRegion extends LocalRegion
return waitForNoStorageOrPrimary(bucketId, "write");
}
-
-
/**
* wait until there is a primary or there is no storage
*
@@ -3134,8 +3140,7 @@ public class PartitionedRegion extends LocalRegion
if (isTX()) {
return getNodeForBucketWrite(bucketId, null);
}
- InternalDistributedMember result = getRegionAdvisor().getPreferredNode(bucketId);
- return result;
+ return getRegionAdvisor().getPreferredNode(bucketId);
}
/**
@@ -3307,9 +3312,6 @@ public class PartitionedRegion extends LocalRegion
* can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel
* across all the nodes.
*
- * @param function
- * @param execution
- * @param rc
* @since GemFire 6.0
*/
public ResultCollector executeFunction(final Function function,
@@ -3363,9 +3365,6 @@ public class PartitionedRegion extends LocalRegion
/**
* Executes function on multiple nodes
- *
- * @param function
- * @param execution
*/
private ResultCollector executeOnMultipleNodes(final Function function,
final PartitionedRegionFunctionExecutor execution, ResultCollector rc, boolean isPRSingleHop,
@@ -3412,8 +3411,7 @@ public class PartitionedRegion extends LocalRegion
boolean hasRemovedNode = false;
while (iterator.hasNext()) {
- if (execution.getFailedNodes()
- .contains(((InternalDistributedMember) iterator.next()).getId())) {
+ if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) {
hasRemovedNode = true;
}
}
@@ -3482,7 +3480,7 @@ public class PartitionedRegion extends LocalRegion
.constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this, localBucketSet),
localBucketSet, resultSender, execution.isReExecute());
if (logger.isDebugEnabled()) {
- logger.debug("FunctionService: Executing on local node with keys.{}" + localKeys);
+ logger.debug("FunctionService: Executing on local node with keys.{}", localKeys);
}
execution.executeFunctionOnLocalPRNode(function, prContext, resultSender, dm, isTX());
}
@@ -3500,8 +3498,8 @@ public class PartitionedRegion extends LocalRegion
recipMap.put(recip, context);
}
if (logger.isDebugEnabled()) {
- logger.debug("FunctionService: Executing on remote nodes with member to keys map.{}"
- + memberToKeysMap);
+ logger.debug("FunctionService: Executing on remote nodes with member to keys map.{}",
+ memberToKeysMap);
}
PartitionedRegionFunctionResultWaiter resultReciever =
new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(),
@@ -3509,14 +3507,11 @@ public class PartitionedRegion extends LocalRegion
return resultReciever.getPartitionedDataFrom(recipMap, this, execution);
}
return localResultCollector;
-
}
/**
* Single key execution on single node
*
- * @param function
- * @param execution
* @since GemFire 6.0
*/
private ResultCollector executeOnSingleNode(final Function function,
@@ -3526,14 +3521,14 @@ public class PartitionedRegion extends LocalRegion
final Object key = routingKeys.iterator().next();
final Integer bucketId;
if (isBucketSetAsFilter) {
- bucketId = ((Integer) key).intValue();
+ bucketId = (Integer) key;
} else {
- bucketId = Integer.valueOf(
- PartitionedRegionHelper.getHashKey(this, Operation.FUNCTION_EXECUTION, key, null, null));
+ bucketId =
+ PartitionedRegionHelper.getHashKey(this, Operation.FUNCTION_EXECUTION, key, null, null);
}
InternalDistributedMember targetNode = null;
if (function.optimizeForWrite()) {
- targetNode = createBucket(bucketId.intValue(), 0, null /* retryTimeKeeper */);
+ targetNode = createBucket(bucketId, 0, null /* retryTimeKeeper */);
HeapMemoryMonitor hmm =
((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
if (hmm.isMemberHeapCritical(targetNode)
@@ -3541,11 +3536,11 @@ public class PartitionedRegion extends LocalRegion
Set<DistributedMember> sm = Collections.singleton((DistributedMember) targetNode);
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
+ .toLocalizedString(function.getId(), sm),
sm);
}
} else {
- targetNode = getOrCreateNodeForBucketRead(bucketId.intValue());
+ targetNode = getOrCreateNodeForBucketRead(bucketId);
}
final DistributedMember localVm = getMyId();
if (targetNode != null && isPRSingleHop && !localVm.equals(targetNode)) {
@@ -3580,7 +3575,7 @@ public class PartitionedRegion extends LocalRegion
* if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null,
* "doing function execution", this.retryTimeout); // NOTREACHED }
*/
- // Asif: Fix for Bug # 40083
+ // Fix for Bug # 40083
targetNode = null;
while (targetNode == null) {
if (retryTime.overMaximum()) {
@@ -3590,9 +3585,9 @@ public class PartitionedRegion extends LocalRegion
}
retryTime.waitToRetryNode();
if (function.optimizeForWrite()) {
- targetNode = getOrCreateNodeForBucketWrite(bucketId.intValue(), retryTime);
+ targetNode = getOrCreateNodeForBucketWrite(bucketId, retryTime);
} else {
- targetNode = getOrCreateNodeForBucketRead(bucketId.intValue());
+ targetNode = getOrCreateNodeForBucketRead(bucketId);
}
}
if (targetNode == null) {
@@ -3636,7 +3631,8 @@ public class PartitionedRegion extends LocalRegion
Set<Integer> actualBucketSet = this.getRegionAdvisor().getBucketSet();
try {
bucketSet.retainAll(actualBucketSet);
- } catch (NoSuchElementException done) {
+ } catch (NoSuchElementException ignore) {
+ // done
}
HashMap<InternalDistributedMember, HashSet<Integer>> memberToBuckets =
FunctionExecutionNodePruner.groupByMemberToBuckets(this, bucketSet,
@@ -3692,8 +3688,7 @@ public class PartitionedRegion extends LocalRegion
boolean hasRemovedNode = false;
while (iterator.hasNext()) {
- if (execution.getFailedNodes()
- .contains(((InternalDistributedMember) iterator.next()).getId())) {
+ if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) {
hasRemovedNode = true;
}
}
@@ -3720,7 +3715,7 @@ public class PartitionedRegion extends LocalRegion
Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
throw new LowMemoryException(
LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1
- .toLocalizedString(new Object[] {function.getId(), sm}),
+ .toLocalizedString(function.getId(), sm),
sm);
}
@@ -3738,8 +3733,6 @@ public class PartitionedRegion extends LocalRegion
execution.isReExecute(), execution.isFnSerializationReqd());
recipMap.put(recip, context);
}
- // final LocalResultCollector localResultCollector = new LocalResultCollector(function, rc,
- // execution);
final LocalResultCollector<?, ?> localRC = execution.getLocalResultCollector(function, rc);
final DM dm = getDistributionManager();
@@ -3755,28 +3748,18 @@ public class PartitionedRegion extends LocalRegion
execution.getArgumentsForMember(getMyId().getId()), null, ColocationHelper
.constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this, localBucketSet),
localBucketSet, resultSender, execution.isReExecute());
- // final RegionFunctionContextImpl prContext = new RegionFunctionContextImpl(
- // function.getId(), PartitionedRegion.this, execution
- // .getArgumentsForMember(getMyId().getId()), null, ColocationHelper
- // .constructAndGetAllColocatedLocalDataSet(PartitionedRegion.this,
- // localBucketSet), resultSender, execution.isReExecute());
execution.executeFunctionOnLocalNode(function, prContext, resultSender, dm, isTX());
}
PartitionedRegionFunctionResultWaiter resultReciever =
new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(), localRC, function,
resultSender);
- ResultCollector reply = resultReciever.getPartitionedDataFrom(recipMap, this, execution);
-
- return reply;
-
+ return resultReciever.getPartitionedDataFrom(recipMap, this, execution);
}
/**
* Executes function on all bucket nodes
*
- * @param function
- * @param execution
* @return ResultCollector
* @since GemFire 6.0
*/
@@ -3788,7 +3771,7 @@ public class PartitionedRegion extends LocalRegion
while (itr.hasNext()) {
try {
bucketSet.add(itr.next());
- } catch (NoSuchElementException ex) {
+ } catch (NoSuchElementException ignore) {
}
}
HashMap<InternalDistributedMember, HashSet<Integer>> memberToBuckets =
@@ -3810,8 +3793,7 @@ public class PartitionedRegion extends LocalRegion
boolean hasRemovedNode = false;
while (iterator.hasNext()) {
- if (execution.getFailedNodes()
- .contains(((InternalDistributedMember) iterator.next()).getId())) {
+ if (execution.getFailedNodes().contains(((DistributedMember) iterator.next()).getId())) {
hasRemovedNode = true;
}
}
@@ -3868,18 +3850,11 @@ public class PartitionedRegion extends LocalRegion
new PartitionedRegionFunctionResultWaiter(getSystem(), this.getPRId(), localResultCollector,
function, resultSender);
- ResultCollector reply = resultReciever.getPartitionedDataFrom(recipMap, this, execution);
-
- return reply;
+ return resultReciever.getPartitionedDataFrom(recipMap, this, execution);
}
/**
- * no docs
- *
- * @param preferCD
* @param requestingClient the client requesting the object, or null if not from a client
- * @param clientEvent TODO
- * @param returnTombstones TODO
* @param allowRetry if false then do not retry
*/
private Object getFromBucket(final InternalDistributedMember targetNode, int bucketId,
@@ -3929,13 +3904,10 @@ public class PartitionedRegion extends LocalRegion
}
}
- // Test hook
- if (((LocalRegion) this).isTest())
- ((LocalRegion) this).incCountNotFoundInLocal();
obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient,
clientEvent, returnTombstones);
- // TODO:Suranjan&Yogesh : there should be better way than this one
+ // TODO: there should be better way than this one
String name = Thread.currentThread().getName();
if (name.startsWith("ServerConnection") && !getMyId().equals(retryNode)) {
setNetworkHopType(bucketId, (InternalDistributedMember) retryNode);
@@ -3986,11 +3958,10 @@ public class PartitionedRegion extends LocalRegion
} else {
// with transaction
if (prce instanceof BucketNotFoundException) {
- TransactionException ex = new TransactionDataRebalancedException(
+ throw new TransactionDataRebalancedException(
LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
- .toLocalizedString(key));
- ex.initCause(prce);
- throw ex;
+ .toLocalizedString(key),
+ prce);
}
Throwable cause = prce.getCause();
if (cause instanceof PrimaryBucketException) {
@@ -3998,17 +3969,15 @@ public class PartitionedRegion extends LocalRegion
} else if (cause instanceof TransactionDataRebalancedException) {
throw (TransactionDataRebalancedException) cause;
} else if (cause instanceof RegionDestroyedException) {
- TransactionException ex = new TransactionDataRebalancedException(
+ throw new TransactionDataRebalancedException(
LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
- .toLocalizedString(key));
- ex.initCause(cause);
- throw ex;
+ .toLocalizedString(key),
+ cause);
} else {
// Make transaction fail so client could retry
// instead of returning null if ForceReattemptException is thrown.
// Should not see it currently, added to be protected against future changes.
- TransactionException ex = new TransactionException("Failed to get key: " + key, prce);
- throw ex;
+ throw new TransactionException("Failed to get key: " + key, prce);
}
}
} catch (PrimaryBucketException notPrimary) {
@@ -4046,17 +4015,15 @@ public class PartitionedRegion extends LocalRegion
if (logger.isDebugEnabled()) {
e = new PartitionedRegionDistributionException(
LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS
- .toLocalizedString(Integer.valueOf(count)));
+ .toLocalizedString(count));
}
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS,
- Integer.valueOf(count)), e);
+ logger.warn(LocalizedMessage
+ .create(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GET_IN_0_ATTEMPTS, count), e);
return null;
}
/**
* If a bucket is local, try to fetch the value from it
- *
*/
public Object getFromLocalBucket(int bucketId, final Object key, final Object aCallbackArgument,
boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
@@ -4079,14 +4046,11 @@ public class PartitionedRegion extends LocalRegion
return null;
}
-
/**
* This invokes a cache writer before a destroy operation. Although it has the same method
* signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion
* invokes this method via its "entries" member, while PartitionedRegion invokes this method in
* its region operation methods and messages.
- *
- * @see LocalRegion#cacheWriteBeforeRegionDestroy(RegionEventImpl)
*/
@Override
boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
@@ -4125,8 +4089,7 @@ public class PartitionedRegion extends LocalRegion
*/
public DistributedMember getMemberOwning(Object key) {
int bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null);
- InternalDistributedMember targetNode = getNodeForBucketRead(bucketId);
- return targetNode;
+ return getNodeForBucketRead(bucketId);
}
/**
@@ -4150,7 +4113,6 @@ public class PartitionedRegion extends LocalRegion
public Object localCacheGet(Object key) {
RegionEntry re = getRegionMap().getEntry(key);
if (re == null || re.isDestroyedOrRemoved()) {
- // TODO:KIRK:OK if (re == null || Token.isRemoved(re.getValueInVM(this))) {
return null;
} else {
return re.getValue(this); // OFFHEAP: spin until we can copy into a heap cd?
@@ -4169,12 +4131,9 @@ public class PartitionedRegion extends LocalRegion
/**
* Test Method: Get a random set of keys from a randomly selected bucket using the provided
- * <code>Random</code> number generator.
+ * {@code Random} number generator.
*
- * @param rnd
* @return A set of keys from a randomly chosen bucket or {@link Collections#EMPTY_SET}
- * @throws IOException
- * @throws ClassNotFoundException
*/
public Set getSomeKeys(Random rnd) throws IOException, ClassNotFoundException {
InternalDistributedMember nod = null;
@@ -4196,7 +4155,7 @@ public class PartitionedRegion extends LocalRegion
}
buck = (Integer) buksA[ind];
- nod = getNodeForBucketRead(buck.intValue());
+ nod = getNodeForBucketRead(buck);
if (nod != null) {
logger.debug("getSomeKeys: iteration: {} for node {}", i, nod);
if (nod.equals(getMyId())) {
@@ -4217,7 +4176,7 @@ public class PartitionedRegion extends LocalRegion
"Test hook getSomeKeys caught a ForceReattemptException for bucketId={}{}{}. Moving on to another bucket",
getPRId(), BUCKET_ID_SEPARATOR, buck, movinOn);
continue;
- } catch (PRLocallyDestroyedException pde) {
+ } catch (PRLocallyDestroyedException ignore) {
logger.debug("getSomeKeys: Encountered PRLocallyDestroyedException");
checkReadiness();
continue;
@@ -4226,7 +4185,7 @@ public class PartitionedRegion extends LocalRegion
} // nod != null
} // for
logger.debug("getSomeKeys: no keys found returning empty set");
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
/**
@@ -4239,7 +4198,7 @@ public class PartitionedRegion extends LocalRegion
*/
public List<BucketDump> getAllBucketEntries(final int bucketId) throws ForceReattemptException {
if (bucketId >= getTotalNumberOfBuckets()) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
ArrayList<BucketDump> ret = new ArrayList<BucketDump>();
HashSet<InternalDistributedMember> collected = new HashSet<InternalDistributedMember>();
@@ -4263,6 +4222,7 @@ public class PartitionedRegion extends LocalRegion
if (owner.equals(getMyId())) {
BucketRegion br = this.dataStore.handleRemoteGetEntries(bucketId);
Map<Object, Object> m = new HashMap<Object, Object>() {
+ // TODO: clean this up -- outer class is not serializable
private static final long serialVersionUID = 0L;
@Override
@@ -4304,7 +4264,7 @@ public class PartitionedRegion extends LocalRegion
final FetchEntriesResponse r;
r = FetchEntriesMessage.send(owner, this, bucketId);
ret.add(r.waitForEntries());
- } catch (ForceReattemptException e) {
+ } catch (ForceReattemptException ignore) {
// node has departed? Ignore.
}
} // for
@@ -4312,11 +4272,9 @@ public class PartitionedRegion extends LocalRegion
return ret;
}
-
/**
* Fetch the keys for the given bucket identifier, if the bucket is local or remote.
*
- * @param bucketNum
* @return A set of keys from bucketNum or {@link Collections#EMPTY_SET}if no keys can be found.
*/
public Set getBucketKeys(int bucketNum) {
@@ -4327,12 +4285,11 @@ public class PartitionedRegion extends LocalRegion
* Fetch the keys for the given bucket identifier, if the bucket is local or remote. This version
* of the method allows you to retrieve Tombstone entries as well as undestroyed entries.
*
- * @param bucketNum
* @param allowTombstones whether to include destroyed entries in the result
* @return A set of keys from bucketNum or {@link Collections#EMPTY_SET}if no keys can be found.
*/
public Set getBucketKeys(int bucketNum, boolean allowTombstones) {
- Integer buck = Integer.valueOf(bucketNum);
+ Integer buck = bucketNum;
final int retryAttempts = calcRetry();
Set ret = null;
int count = 0;
@@ -4371,7 +4328,7 @@ public class PartitionedRegion extends LocalRegion
if (ret != null) {
return ret;
}
- } catch (PRLocallyDestroyedException pde) {
+ } catch (PRLocallyDestroyedException ignore) {
if (logger.isDebugEnabled()) {
logger.debug("getBucketKeys: Encountered PRLocallyDestroyedException");
}
@@ -4385,14 +4342,13 @@ public class PartitionedRegion extends LocalRegion
snoozer = new RetryTimeKeeper(this.retryTimeout);
}
InternalDistributedMember oldNode = nod;
- nod = getNodeForBucketRead(buck.intValue());
+ nod = getNodeForBucketRead(buck);
if (nod != null && nod.equals(oldNode)) {
if (snoozer.overMaximum()) {
checkReadiness();
throw new TimeoutException(
LocalizedStrings.PartitionedRegion_ATTEMPT_TO_ACQUIRE_PRIMARY_NODE_FOR_READ_ON_BUCKET_0_TIMED_OUT_IN_1_MS
- .toLocalizedString(new Object[] {getBucketName(buck.intValue()),
- Integer.valueOf(snoozer.getRetryTime())}));
+ .toLocalizedString(new Object[] {getBucketName(buck), snoozer.getRetryTime()}));
}
snoozer.waitToRetryNode();
}
@@ -4403,7 +4359,7 @@ public class PartitionedRegion extends LocalRegion
if (logger.isDebugEnabled()) {
logger.debug("getBucketKeys: no keys found returning empty set");
}
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
/**
@@ -4478,12 +4434,7 @@ public class PartitionedRegion extends LocalRegion
}
/**
- *
- * @param nodeToBuckets
- * @param values
- * @param servConn
* @return set of bucket-ids that could not be read from.
- * @throws IOException
*/
private Set<Integer> handleOldNodes(HashMap nodeToBuckets, VersionedObjectList values,
ServerConnection servConn) throws IOException {
@@ -4510,7 +4461,7 @@ public class PartitionedRegion extends LocalRegion
tr
<TRUNCATED>