You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/03/10 20:50:03 UTC
incubator-geode git commit: GEODE-1055 Remove unused/dead code from
PartitionedRegionQueryEvaluator
Repository: incubator-geode
Updated Branches:
refs/heads/develop b8d4db2af -> f486b700c
GEODE-1055 Remove unused/dead code from PartitionedRegionQueryEvaluator
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f486b700
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f486b700
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f486b700
Branch: refs/heads/develop
Commit: f486b700c489dd933b98be4be398143ebed794c1
Parents: b8d4db2
Author: Jason Huynh <hu...@gmail.com>
Authored: Thu Jan 21 16:35:15 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Thu Mar 10 11:55:22 2016 -0800
----------------------------------------------------------------------
.../cache/PartitionedRegionQueryEvaluator.java | 391 +------------------
1 file changed, 6 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f486b700/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
index db40af3..1d6cf0e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -26,7 +26,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,7 +38,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CopyHelper;
-import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.QueryExecutionLowMemoryException;
@@ -47,12 +45,6 @@ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.cache.query.internal.CompiledGroupBySelect;
-import com.gemstone.gemfire.cache.query.internal.CompiledID;
-import com.gemstone.gemfire.cache.query.internal.CompiledIndexOperation;
-import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef;
-import com.gemstone.gemfire.cache.query.internal.CompiledLiteral;
-import com.gemstone.gemfire.cache.query.internal.CompiledOperation;
-import com.gemstone.gemfire.cache.query.internal.CompiledPath;
import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
import com.gemstone.gemfire.cache.query.internal.CompiledSortCriterion;
import com.gemstone.gemfire.cache.query.internal.CompiledValue;
@@ -60,23 +52,16 @@ import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
-import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults.Metadata;
import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver.IndexInfo;
import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
import com.gemstone.gemfire.cache.query.internal.OrderByComparator;
import com.gemstone.gemfire.cache.query.internal.PRQueryTraceInfo;
import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext;
import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
-import com.gemstone.gemfire.cache.query.internal.ResultsBag;
import com.gemstone.gemfire.cache.query.internal.ResultsSet;
-import com.gemstone.gemfire.cache.query.internal.RuntimeIterator;
import com.gemstone.gemfire.cache.query.internal.SortedResultsBag;
import com.gemstone.gemfire.cache.query.internal.SortedStructBag;
-import com.gemstone.gemfire.cache.query.internal.StructBag;
-import com.gemstone.gemfire.cache.query.internal.StructImpl;
import com.gemstone.gemfire.cache.query.internal.StructSet;
-import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
-import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
import com.gemstone.gemfire.cache.query.internal.utils.PDXUtils;
import com.gemstone.gemfire.cache.query.types.ObjectType;
import com.gemstone.gemfire.cache.query.types.StructType;
@@ -93,8 +78,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.QueryMessage;
import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.pdx.PdxInstance;
-import com.gemstone.gemfire.pdx.internal.PdxString;
/**
* This class sends the query on various <code>PartitionedRegion</code> data
@@ -109,41 +92,8 @@ import com.gemstone.gemfire.pdx.internal.PdxString;
public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
{
private static final Logger logger = LogService.getLogger();
-
- /**
- * @author Mitch Thomas
- * An ArraList which can be tainted
- * @since 6.0
- */
- public static class TaintableArrayList extends ArrayList {
- private boolean isPoison = false;
- public synchronized void taint() {
- this.isPoison = true;
- super.clear();
- }
- public boolean add(Object arg0) {
- synchronized(this) {
- if (this.isPoison) {
- return false;
- } else {
- return super.add(arg0);
- }
- }
- }
- public synchronized boolean isConsumable() {
- return !this.isPoison && size() > 0;
- }
- public synchronized boolean isTainted() {
- return this.isPoison;
- }
-
- public synchronized void untaint() {
- this.isPoison = false;
- }
- }
-
/**
- * An ArraList which might be unconsumable.
+ * An ArrayList which might be unconsumable.
* @since 6.6.2
* @author shobhit
*/
@@ -221,22 +171,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
protected DistributionMessage createRequestMessage(InternalDistributedMember recipient, ReplyProcessor21 processor, List bucketIds) {
return new QueryMessage(recipient, this.pr.getPRId(), processor, this.query, this.parameters, bucketIds);
}
-
-
- @Override
- public Set<InternalDistributedMember> getPartitionedDataFrom(Set recipients)
- throws com.gemstone.gemfire.cache.TimeoutException, InterruptedException, QueryException, ForceReattemptException {
- if (Thread.interrupted()) throw new InterruptedException();
- if (recipients.isEmpty())
- return Collections.emptySet();
-
- StreamingQueryPartitionResponse processor = new StreamingQueryPartitionResponse(this.sys, recipients);
- DistributionMessage m = createRequestMessage(recipients, processor);
- this.sys.getDistributionManager().putOutgoing(m);
- // should we allow this to timeout?
- Set<InternalDistributedMember> failedMembers = processor.waitForCacheOrQueryException();
- return failedMembers;
- }
/**
@@ -311,23 +245,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
((MemberResultsList) results).setLastChunkReceived(true);
}
}
-
- //this.resultsPerMember.putIfAbsent(sender, objects);
- /*
- boolean toContinue = true;
- for (Iterator itr = objects.iterator(); itr.hasNext();) {
- final Object o = itr.next();
- if (o instanceof PRQueryProcessor.EndOfBucket) {
- int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId();
- synchronized (this.successfulBuckets) {
- this.successfulBuckets.add(bucketId);
- }
- }
- else {
- saveDataForMember(o, sender);
- }
- }
- */
+
return true;
}
@@ -559,7 +477,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
break;
}
- Map b2n = buildNodeToBucketMapForBuckets(caclulateRetryBuckets());
+ Map b2n = buildNodeToBucketMapForBuckets(calculateRetryBuckets());
if (th != null) {
th.hook(2);
}
@@ -576,39 +494,15 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
th.hook(3);
}
}
- // the failed buckets are those in this.bucketsToQuery that are
- // not present in this.successfulBuckets
- /*
- synchronized (this.successfulBuckets) {
- this.bucketsToQuery.removeAll(this.successfulBuckets.toArray());
- this.successfulBuckets.clear();
- }
-
- */
+
if (needsRetry) {
String msg = "Failed to query all the partitioned region " +
"dataset (buckets) after " + retry + " attempts.";
if (isDebugEnabled) {
- logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.caclulateRetryBuckets());
+ logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.calculateRetryBuckets());
}
throw new QueryException(msg);
-
- /*
- if (anyOfTheseBucketsHasStorage(this.bucketsToQuery)) {
- if (retry >= MAX_PR_QUERY_RETRIES) {
- String msg = "Query failed to get all results after " + retry + " attempts";
- throw new QueryException(msg);
- } else {
- failMissingBuckets();
- }
- } else {
- String msg = "Data loss detected during query "
- + this.query.getQueryString()
- + " subsequent query results should be suspect.";
- throw new QueryException(msg);
- }
- */
}
return addResultsToResultSet();
@@ -629,44 +523,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
}
}
-
- private boolean anyOfTheseBucketsHasStorage(Set<Integer> failedBuckets) {
- boolean haveStorage = false;
- for (Integer bid : failedBuckets) {
- if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) {
- Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid);
- for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) {
- InternalDistributedMember mem = (InternalDistributedMember)boi.next();
- TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem);
- if (tal == null || !tal.isTainted()) {
- haveStorage = true;
- }
- }
- }
- }
- return haveStorage;
-
- /*
- boolean haveStorage = false;
- for (Iterator i = failedBuckets.iterator(); i.hasNext(); ) {
- final Integer bid = i.next();
- if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) {
- Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid);
- for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) {
- InternalDistributedMember mem = (InternalDistributedMember)boi.next();
- TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem);
- if (tal == null || !tal.isTainted()) {
- haveStorage = true;
- }
- }
- }
- }
- return haveStorage;
- */
- }
-
- private Set<Integer> caclulateRetryBuckets() {
+ private Set<Integer> calculateRetryBuckets() {
Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> memberToBucketList = node2bucketIds.entrySet().iterator();
final HashSet<Integer> retryBuckets = new HashSet<Integer>();
while (memberToBucketList.hasNext()) {
@@ -955,74 +813,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
- //returns attribute with escape quotes #51085 and #51886
- private String checkReservedKeyword(String attr) {
- if (attr != null && attr.length() > 0 && attr.contains(".")) {
- String[] splits = attr.split("[.]");
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < splits.length; i++) {
- sb.append(checkReservedKeyword(splits[i]) + ".");
- }
-
- if (sb.length() <= 1) {
- attr = sb.toString();
- }
- else {
- attr = sb.substring(0, sb.length() - 1);
- }
- }
- else if(DefaultQuery.reservedKeywords.contains(attr.toLowerCase())) {
- attr = "\"" + attr + "\"";
- }
- return attr;
- }
-
- /**
- * This returns the query clause as represented in the application query.
- * E.g.: returns p.status, p.getStatus() as represented by passed compiledValue.
- */
- private String getQueryAttributes(CompiledValue cv, StringBuffer fromPath) throws QueryException {
- // field with multiple level like p.pos.secId
- String clause = "";
- if (cv.getType() == OQLLexerTokenTypes.Identifier) {
- // It will be p.pos.secId
- clause = ((CompiledID)cv).getId() + clause;
- } else {
- do {
- if (cv.getType() == CompiledPath.PATH || cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
- if (cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
- CompiledIndexOperation cio = (CompiledIndexOperation)cv;
- CompiledLiteral cl = (CompiledLiteral)cio.getExpression();
- StringBuffer sb = new StringBuffer();
- cl.generateCanonicalizedExpression(sb, null);
- cv = ((CompiledIndexOperation)cv).getReceiver();
- if (sb.length() > 0) {
- clause = "[" + sb.toString() + "]" + clause;
- }
- }
- clause = ("." + ((CompiledPath)cv).getTailID() + clause);
- } else if (cv.getType() == OQLLexerTokenTypes.METHOD_INV) {
- // Function call.
- clause = "." + ((CompiledOperation)cv).getMethodName() + "()" + clause;
- } else {
- throw new QueryException("Failed to evaluate order by attributes, found unsupported type " + cv.getType() +
- " Unable to apply order-by on the partition region cumulative results.");
- }
-
- cv = cv.getReceiver();
- } while (!(cv.getType() == OQLLexerTokenTypes.Identifier));
-
- if (cv.getType() == OQLLexerTokenTypes.Identifier) {
- clause = ((CompiledID)cv).getId() + clause;
- // Append region iterator alias. p
- if (fromPath != null) {
- fromPath.append(((CompiledID)cv).getId());
- }
- }
- }
- return clause;
- }
-
/**
* Generates a map with key as PR node and value as the list as a subset of
* the bucketIds hosted by the node.
@@ -1081,20 +871,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
//Put the failed members on the end of the list.
if(failedMembers != null && !failedMembers.isEmpty()) {
allNodes.removeAll(failedMembers);
- //Collections.shuffle(allNodes, PartitionedRegion.rand);
allNodes.addAll(failedMembers);
}
for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext() && (bucketIds.size() < totalBucketsToQuery); ) {
InternalDistributedMember nd = (InternalDistributedMember)dsItr.next();
- /*
- if(taintedMembers.contains(nd)) {
- //clear the tainted state
- resultsPerMember.get(nd).untaint();
- }
- */
-
final List<Integer> buckets = new ArrayList<Integer>();
for (Integer bid : bucketIdsToConsider) {
if (!bucketIds.contains(bid)) {
@@ -1141,14 +923,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
if (this.pr.getDataStore() != null) {
this.pr.getDataStore().invokeBucketReadHook();
final InternalDistributedMember me = this.pr.getMyId();
- //Create PRQueryResultCollector here.
- //RQueryResultCollector resultCollector = new PRQueryResultCollector();
List<Integer> bucketList = this.node2bucketIds.get(me);
- //try {
-
- //this.pr.getDataStore().queryLocalNode(this.query, this.parameters,
- // bucketList, resultCollector);
try {
PRQueryProcessor qp = new PRQueryProcessor(this.pr, query, parameters, bucketList);
MemberResultsList resultCollector = new MemberResultsList();
@@ -1208,167 +984,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
return true;
}
- /*
- ExecutionContext context = new ExecutionContext(parameters, this.pr.getCache());
- context.setBucketList(bucketList);
- try {
- SelectResults results = (SelectResults)this.query.executeUsingContext(context);
- addToResultCollector(results, resultCollector, me);
- } catch (BucketMovedException bme) {
- return true;
- }
-
- //this.successfulBuckets.addAll(context.getSuccessfulBuckets());
- for (Object o: context.getBucketList()) {
- Integer bId = (Integer)o;
- this.successfulBuckets.add(bId.intValue());
- }
-
- /*
- } catch (ForceReattemptException retryRequired) {
- return true;
- }
- */
- /*
- int tokenCount = 0;
- final int numBuckets = bucketList.size();
- // finished when we get the nth END_OF_STREAM token, where n is the number of buckets
- boolean toContinue = true;
- Object o = null;
- while (tokenCount < numBuckets) {
- o = resultCollector.get();
- if (o instanceof PRQueryProcessor.EndOfBucket) {
- int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId();
- synchronized (this.successfulBuckets) {
- this.successfulBuckets.add(bucketId);
- }
- tokenCount++;
- } else {
- if (o == DefaultQuery.NULL_RESULT) {
- o = null;
- }
- saveDataForMember(o, me);
- }
- }
- Assert.assertTrue(resultCollector.isEmpty());
- */
}
return false;
}
-
- /*
- private void saveDataForMember(final Object data, final InternalDistributedMember member) {
- TaintableArrayList existing = this.resultsPerMember.get(member);
- if (existing == null) {
- synchronized (member) {
- existing = new TaintableArrayList();
- this.resultsPerMember.putIfAbsent(member, existing);
- }
- }
- existing.add(data);
- }
- */
protected void memberStreamCorrupted(InternalDistributedMember sender) {
this.resultsPerMember.remove(sender);
- /*
- final TaintableArrayList tainted = new TaintableArrayList();
- tainted.taint();
- TaintableArrayList existing =
- (TaintableArrayList)this.resultsPerMember.putIfAbsent(sender, tainted);
- if (existing != null) {
- existing.taint();
- }
-
- ArrayList bucketIds = (ArrayList)this.node2bucketIds.get(sender);
- if (bucketIds != null) {
- ArrayList removedBucketIds = null;
- for (Iterator i = bucketIds.iterator(); i.hasNext(); ) {
- Integer bid = (Integer)i.next();
- synchronized(this.successfulBuckets) {
- if (this.successfulBuckets.remove(bid.intValue())) {
- if (removedBucketIds == null) {
- removedBucketIds = new ArrayList();
- }
- removedBucketIds.add(bid);
- }
- }
- }
-
- }
- */
- }
-
- // @todo need to throw a better exception than QueryException
- /**
- * Fail due to not getting all the data back for all the buckets,
- * reporting which buckets failed on which nodes.
- *
- * @throws QueryException always throws
- * since QueryException should be abstract
- */
- private void failMissingBuckets() throws QueryException {
- // convert to Map of nodes to bucket ids for error message
- Map n2b = new HashMap();
- for (Integer bId : this.bucketsToQuery) {
- InternalDistributedMember node = findNodeForBucket(bId);
- List listOfInts = (List)n2b.get(node);
- if (listOfInts == null) {
- listOfInts = new ArrayList<Integer>();
- n2b.put(node, listOfInts);
- }
- listOfInts.add(bId);
- }
-
- /*
- Iterator = this.bucketsToQuery.iterator();
- int sz = intArray.length;
- Map n2b = new HashMap();
- for (int i = 0; i < sz; i++) {
- Integer bucketId = Integer.valueOf(intArray[i]);
- InternalDistributedMember node = findNodeForBucket(bucketId);
- List listOfInts = (List)n2b.get(node);
- if (listOfInts == null) {
- listOfInts = new ArrayList();
- n2b.put(node, listOfInts);
- }
- listOfInts.add(bucketId);
- }
- */
-
- // One last check, after all else is said and done:
- // if the system is closing, don't fail the query, but
- // generate a much more serious error...
- this.pr.getCancelCriterion().checkCancelInProgress(null);
-
- // the failure
- String msg = "Query failed; unable to get results from the following node/buckets: "
- + n2b;
-
- logger.fatal(msg);
- throw new QueryException( // @todo what is a better exception to throw here?
- msg);
-
- /* alternative strategy: re-query
- queryBuckets();
- */
-
- }
-
- private InternalDistributedMember findNodeForBucket(Integer bucketId) {
- for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> itr = this.node2bucketIds.entrySet().iterator(); itr.hasNext(); ) {
- Map.Entry<InternalDistributedMember,List<Integer>> entry = itr.next();
- List<Integer> blist = entry.getValue();
- for (Iterator<Integer> itr2 = blist.iterator(); itr2.hasNext(); ) {
- Integer bid = itr2.next();
- if (bid.equals(bucketId)) {
- return (InternalDistributedMember)entry.getKey();
- }
- }
- }
- String msg = "Unable to get node for bucket id " + bucketId + " node to bucket map is " + this.node2bucketIds;
- logger.fatal(msg);
- throw new InternalGemFireError(msg);
}
/**