You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/15 01:24:09 UTC

[26/44] incubator-geode git commit: GEODE-1055 Remove unused/dead code from PartitionedRegionQueryEvaluator

GEODE-1055 Remove unused/dead code from PartitionedRegionQueryEvaluator


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f486b700
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f486b700
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f486b700

Branch: refs/heads/feature/GEODE-949-2
Commit: f486b700c489dd933b98be4be398143ebed794c1
Parents: b8d4db2
Author: Jason Huynh <hu...@gmail.com>
Authored: Thu Jan 21 16:35:15 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Thu Mar 10 11:55:22 2016 -0800

----------------------------------------------------------------------
 .../cache/PartitionedRegionQueryEvaluator.java  | 391 +------------------
 1 file changed, 6 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f486b700/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
index db40af3..1d6cf0e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -26,7 +26,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,7 +38,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.CopyHelper;
-import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.query.QueryException;
 import com.gemstone.gemfire.cache.query.QueryExecutionLowMemoryException;
@@ -47,12 +45,6 @@ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
 import com.gemstone.gemfire.cache.query.SelectResults;
 import com.gemstone.gemfire.cache.query.Struct;
 import com.gemstone.gemfire.cache.query.internal.CompiledGroupBySelect;
-import com.gemstone.gemfire.cache.query.internal.CompiledID;
-import com.gemstone.gemfire.cache.query.internal.CompiledIndexOperation;
-import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef;
-import com.gemstone.gemfire.cache.query.internal.CompiledLiteral;
-import com.gemstone.gemfire.cache.query.internal.CompiledOperation;
-import com.gemstone.gemfire.cache.query.internal.CompiledPath;
 import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
 import com.gemstone.gemfire.cache.query.internal.CompiledSortCriterion;
 import com.gemstone.gemfire.cache.query.internal.CompiledValue;
@@ -60,23 +52,16 @@ import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
 import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
-import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults.Metadata;
 import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver.IndexInfo;
 import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
 import com.gemstone.gemfire.cache.query.internal.OrderByComparator;
 import com.gemstone.gemfire.cache.query.internal.PRQueryTraceInfo;
 import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext;
 import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
-import com.gemstone.gemfire.cache.query.internal.ResultsBag;
 import com.gemstone.gemfire.cache.query.internal.ResultsSet;
-import com.gemstone.gemfire.cache.query.internal.RuntimeIterator;
 import com.gemstone.gemfire.cache.query.internal.SortedResultsBag;
 import com.gemstone.gemfire.cache.query.internal.SortedStructBag;
-import com.gemstone.gemfire.cache.query.internal.StructBag;
-import com.gemstone.gemfire.cache.query.internal.StructImpl;
 import com.gemstone.gemfire.cache.query.internal.StructSet;
-import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
-import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
 import com.gemstone.gemfire.cache.query.internal.utils.PDXUtils;
 import com.gemstone.gemfire.cache.query.types.ObjectType;
 import com.gemstone.gemfire.cache.query.types.StructType;
@@ -93,8 +78,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.QueryMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.pdx.PdxInstance;
-import com.gemstone.gemfire.pdx.internal.PdxString;
 
 /**
  * This class sends the query on various <code>PartitionedRegion</code> data
@@ -109,41 +92,8 @@ import com.gemstone.gemfire.pdx.internal.PdxString;
 public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
 {
   private static final Logger logger = LogService.getLogger();
-  
-  /**
-   * @author Mitch Thomas
-   * An ArraList which can be tainted
-   * @since 6.0
-   */
-  public static class TaintableArrayList extends ArrayList {
-    private boolean isPoison = false;
-    public synchronized void taint() {
-      this.isPoison = true;
-      super.clear();
-    }
-    public boolean add(Object arg0) {
-      synchronized(this) {
-        if (this.isPoison) {
-          return false;
-        } else {
-          return super.add(arg0);
-        }
-      }
-    }
-    public synchronized boolean isConsumable() {
-      return !this.isPoison && size() > 0;
-    }
-    public synchronized boolean isTainted() {
-      return this.isPoison;
-    }
-    
-    public synchronized void untaint() {
-      this.isPoison = false;
-    }
-  }
-
   /**
-   * An ArraList which might be unconsumable.
+   * An ArrayList which might be unconsumable.
    * @since 6.6.2
    * @author shobhit
    */
@@ -221,22 +171,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   protected DistributionMessage createRequestMessage(InternalDistributedMember recipient, ReplyProcessor21 processor, List bucketIds) {
     return new QueryMessage(recipient, this.pr.getPRId(), processor, this.query, this.parameters, bucketIds);
   }
-  
- 
-  @Override
-  public Set<InternalDistributedMember> getPartitionedDataFrom(Set recipients)
-   throws com.gemstone.gemfire.cache.TimeoutException, InterruptedException, QueryException, ForceReattemptException {
-     if (Thread.interrupted()) throw new InterruptedException();
-     if (recipients.isEmpty())
-       return Collections.emptySet();
-     
-     StreamingQueryPartitionResponse processor = new StreamingQueryPartitionResponse(this.sys, recipients);
-     DistributionMessage m = createRequestMessage(recipients, processor);
-     this.sys.getDistributionManager().putOutgoing(m);
-     // should we allow this to timeout?
-     Set<InternalDistributedMember> failedMembers = processor.waitForCacheOrQueryException();
-     return failedMembers;
-  }
 
   
   /**
@@ -311,23 +245,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         ((MemberResultsList) results).setLastChunkReceived(true);
       }
     }
-    
-    //this.resultsPerMember.putIfAbsent(sender, objects);
-    /*
-    boolean toContinue = true;
-    for (Iterator itr = objects.iterator(); itr.hasNext();) {
-      final Object o = itr.next();
-      if (o instanceof PRQueryProcessor.EndOfBucket) {
-        int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId();
-        synchronized (this.successfulBuckets) {
-          this.successfulBuckets.add(bucketId);
-        }
-      }
-      else {
-        saveDataForMember(o, sender);
-      }
-    }
-    */
+
     return true;
   }
 
@@ -559,7 +477,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
           }
           break;
         }
-        Map b2n = buildNodeToBucketMapForBuckets(caclulateRetryBuckets());
+        Map b2n = buildNodeToBucketMapForBuckets(calculateRetryBuckets());
         if (th != null) {
           th.hook(2);
         }
@@ -576,39 +494,15 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         th.hook(3);
       }
     }
-    // the failed buckets are those in this.bucketsToQuery that are
-    // not present in this.successfulBuckets
-    /*
-    synchronized (this.successfulBuckets) {
-      this.bucketsToQuery.removeAll(this.successfulBuckets.toArray());
-      this.successfulBuckets.clear();
-    }
-    
-    */
+
     if (needsRetry) {
       String msg = "Failed to query all the partitioned region " +
         "dataset (buckets) after " + retry + " attempts.";
       
       if (isDebugEnabled) {
-        logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.caclulateRetryBuckets());
+        logger.debug("{} Unable to query some of the buckets from the set :{}", msg, this.calculateRetryBuckets());
       }
       throw new QueryException(msg);
-
-      /*
-      if (anyOfTheseBucketsHasStorage(this.bucketsToQuery)) {
-        if (retry >= MAX_PR_QUERY_RETRIES) {
-          String msg = "Query failed to get all results after " + retry + " attempts";
-          throw new QueryException(msg);
-        } else {
-          failMissingBuckets();
-        }
-      } else {
-        String msg = "Data loss detected during query "
-          + this.query.getQueryString()
-          + " subsequent query results should be suspect.";
-        throw new QueryException(msg);
-      }
-      */
     }
 
     return addResultsToResultSet();
@@ -629,44 +523,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       }
     }
   }
-
-  private boolean anyOfTheseBucketsHasStorage(Set<Integer> failedBuckets) {
-    boolean haveStorage = false;
-    for (Integer bid : failedBuckets) {
-      if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) {
-        Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid);
-        for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) {
-          InternalDistributedMember mem = (InternalDistributedMember)boi.next();
-          TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem);
-          if (tal == null || !tal.isTainted()) {
-            haveStorage = true;
-          }
-        }
-      }
-    }
-    return haveStorage;
-    
-    /*
-    boolean haveStorage = false;
-    for (Iterator i = failedBuckets.iterator(); i.hasNext(); ) {
-      final Integer bid = i.next();
-      if (this.pr.getRegionAdvisor().isStorageAssignedForBucket(bid)) {
-        Set ownrs = this.pr.getRegionAdvisor().getBucketOwners(bid);
-        for (Iterator boi = ownrs.iterator(); boi.hasNext(); ) {
-          InternalDistributedMember mem = (InternalDistributedMember)boi.next();
-          TaintableArrayList tal = (TaintableArrayList)this.resultsPerMember.get(mem);
-          if (tal == null || !tal.isTainted()) {
-            haveStorage = true;
-          }
-        }
-      }
-    }
-    return haveStorage;
-    */
-  }
   
-  
-  private Set<Integer> caclulateRetryBuckets() {
+  private Set<Integer> calculateRetryBuckets() {
     Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> memberToBucketList = node2bucketIds.entrySet().iterator();
     final HashSet<Integer> retryBuckets = new HashSet<Integer>();
     while (memberToBucketList.hasNext()) {
@@ -955,74 +813,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     
   }
 
-  //returns attribute with escape quotes #51085 and #51886
-  private String checkReservedKeyword(String attr) {
-    if (attr != null && attr.length() > 0 && attr.contains(".")) {
-      String[] splits = attr.split("[.]");
-      StringBuffer sb = new StringBuffer();
-      for (int i = 0; i < splits.length; i++) {
-        sb.append(checkReservedKeyword(splits[i]) + ".");
-      } 
-      
-      if (sb.length() <= 1) {
-        attr = sb.toString(); 
-      } 
-      else {
-        attr = sb.substring(0, sb.length() - 1);
-      }
-    }
-    else if(DefaultQuery.reservedKeywords.contains(attr.toLowerCase())) {
-      attr = "\"" + attr + "\"";
-    }
-    return attr;
-  }
-
-  /**
-   * This returns the query clause as represented in the application query.
-   * E.g.: returns p.status, p.getStatus() as represented by passed compiledValue.
-   */
-  private String getQueryAttributes(CompiledValue cv, StringBuffer fromPath) throws QueryException {
-    // field with multiple level like p.pos.secId
-    String clause = "";
-    if (cv.getType() == OQLLexerTokenTypes.Identifier)  {
-      // It will be p.pos.secId
-      clause = ((CompiledID)cv).getId() + clause;
-    } else {
-      do {
-        if (cv.getType() == CompiledPath.PATH || cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
-          if (cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
-            CompiledIndexOperation cio = (CompiledIndexOperation)cv;
-            CompiledLiteral cl = (CompiledLiteral)cio.getExpression();
-            StringBuffer sb = new StringBuffer();
-            cl.generateCanonicalizedExpression(sb, null);
-            cv = ((CompiledIndexOperation)cv).getReceiver();
-            if (sb.length() > 0) {
-              clause = "[" + sb.toString() + "]" + clause;
-            }
-          }
-          clause = ("." + ((CompiledPath)cv).getTailID() + clause);
-        } else if (cv.getType() == OQLLexerTokenTypes.METHOD_INV) {
-          // Function call.
-          clause = "." + ((CompiledOperation)cv).getMethodName() + "()" + clause;
-        } else {
-          throw new QueryException("Failed to evaluate order by attributes, found unsupported type  " + cv.getType() + 
-          " Unable to apply order-by on the partition region cumulative results.");
-        }
-
-        cv = cv.getReceiver();
-      } while (!(cv.getType() == OQLLexerTokenTypes.Identifier));
-
-      if (cv.getType() == OQLLexerTokenTypes.Identifier) {
-        clause = ((CompiledID)cv).getId() + clause;
-        // Append region iterator alias. p
-        if (fromPath != null) {
-          fromPath.append(((CompiledID)cv).getId());
-        }
-      }      
-    }
-    return clause;
-  }
-
   /**
    * Generates a map with key as PR node and value as the list as a subset of
    * the bucketIds hosted by the node.
@@ -1081,20 +871,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     //Put the failed members on the end of the list.
     if(failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
-      //Collections.shuffle(allNodes, PartitionedRegion.rand);
       allNodes.addAll(failedMembers);
     }
     
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext() && (bucketIds.size() < totalBucketsToQuery); ) {
       InternalDistributedMember nd = (InternalDistributedMember)dsItr.next();
       
-      /*
-      if(taintedMembers.contains(nd)) {
-        //clear the tainted state
-        resultsPerMember.get(nd).untaint();
-      }
-      */
-      
       final List<Integer> buckets = new ArrayList<Integer>();
       for (Integer bid : bucketIdsToConsider) {
         if (!bucketIds.contains(bid)) {
@@ -1141,14 +923,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     if (this.pr.getDataStore() != null) {
       this.pr.getDataStore().invokeBucketReadHook();
       final InternalDistributedMember me = this.pr.getMyId();
-      //Create PRQueryResultCollector here.
-      //RQueryResultCollector resultCollector = new PRQueryResultCollector();
 
       List<Integer> bucketList = this.node2bucketIds.get(me);
-      //try {
-        
-        //this.pr.getDataStore().queryLocalNode(this.query, this.parameters,
-        //    bucketList, resultCollector);
       try {
         PRQueryProcessor qp = new PRQueryProcessor(this.pr, query, parameters, bucketList);
         MemberResultsList resultCollector = new MemberResultsList();
@@ -1208,167 +984,12 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
         return true;
       } 
-        /*
-        ExecutionContext context = new ExecutionContext(parameters, this.pr.getCache());
-        context.setBucketList(bucketList);
-        try {
-          SelectResults results = (SelectResults)this.query.executeUsingContext(context);
-          addToResultCollector(results, resultCollector, me);
-        } catch (BucketMovedException bme) {
-          return true;
-        }
-        
-        //this.successfulBuckets.addAll(context.getSuccessfulBuckets());
-        for (Object o: context.getBucketList()) {
-          Integer bId = (Integer)o;
-          this.successfulBuckets.add(bId.intValue());
-        }
-        
-      /*
-      } catch (ForceReattemptException retryRequired) {
-        return true;
-      }
-      */
-      /*
-      int tokenCount = 0;
-      final int numBuckets = bucketList.size();
-      // finished when we get the nth END_OF_STREAM token, where n is the number of buckets
-      boolean toContinue = true;
-      Object o = null;
-      while (tokenCount < numBuckets) {
-        o = resultCollector.get();
-        if (o instanceof PRQueryProcessor.EndOfBucket) {
-          int bucketId = ((PRQueryProcessor.EndOfBucket)o).getBucketId();
-          synchronized (this.successfulBuckets) {
-            this.successfulBuckets.add(bucketId);
-          }
-          tokenCount++;
-        } else {
-          if (o == DefaultQuery.NULL_RESULT) {
-            o = null;
-          }
-          saveDataForMember(o, me);
-        }
-      }
-      Assert.assertTrue(resultCollector.isEmpty());
-				*/
     }
     return false;
   }
-  
-		/*
-  private void saveDataForMember(final Object data, final InternalDistributedMember member) {
-    TaintableArrayList existing = this.resultsPerMember.get(member);
-    if (existing == null) {
-      synchronized (member) {
-        existing = new TaintableArrayList();
-        this.resultsPerMember.putIfAbsent(member, existing);
-      }
-    }
-    existing.add(data);
-  }
-  */
 
   protected void memberStreamCorrupted(InternalDistributedMember sender) {
     this.resultsPerMember.remove(sender);
-    /*
-    final TaintableArrayList tainted = new TaintableArrayList();
-    tainted.taint();
-    TaintableArrayList existing = 
-      (TaintableArrayList)this.resultsPerMember.putIfAbsent(sender, tainted);
-    if (existing != null) {
-      existing.taint();
-    }
-
-    ArrayList bucketIds = (ArrayList)this.node2bucketIds.get(sender);
-    if (bucketIds != null) {
-      ArrayList removedBucketIds = null;
-      for (Iterator i = bucketIds.iterator(); i.hasNext(); ) {
-        Integer bid = (Integer)i.next();
-        synchronized(this.successfulBuckets) {
-          if (this.successfulBuckets.remove(bid.intValue())) {
-            if (removedBucketIds == null) {
-              removedBucketIds = new ArrayList();
-            }
-            removedBucketIds.add(bid);
-          }
-        }
-      }
-
-    }
-    */
-  }
-
-  // @todo need to throw a better exception than QueryException
-  /**
-   * Fail due to not getting all the data back for all the buckets,
-   * reporting which buckets failed on which nodes.
-   *
-   * @throws QueryException always throws
-   * since QueryException should be abstract
-   */
-  private void failMissingBuckets() throws QueryException {
-    // convert to Map of nodes to bucket ids for error message
-    Map n2b = new HashMap();
-    for (Integer bId : this.bucketsToQuery) {
-      InternalDistributedMember node = findNodeForBucket(bId);
-      List listOfInts = (List)n2b.get(node);
-      if (listOfInts == null) {
-        listOfInts = new ArrayList<Integer>();
-        n2b.put(node, listOfInts);
-      }
-      listOfInts.add(bId);
-    }
-    
-    /*
-    Iterator = this.bucketsToQuery.iterator();
-    int sz = intArray.length;
-    Map n2b = new HashMap();
-    for (int i = 0; i < sz; i++) {
-      Integer bucketId = Integer.valueOf(intArray[i]);
-      InternalDistributedMember node = findNodeForBucket(bucketId);
-      List listOfInts = (List)n2b.get(node);
-      if (listOfInts == null) {
-        listOfInts = new ArrayList();
-        n2b.put(node, listOfInts);
-      }
-      listOfInts.add(bucketId);
-    }
-    */
-    
-    // One last check, after all else is said and done: 
-    // if the system is closing, don't fail the query, but
-    // generate a much more serious error...
-    this.pr.getCancelCriterion().checkCancelInProgress(null);
-    
-    // the failure
-    String msg = "Query failed; unable to get results from the following node/buckets: "
-    + n2b;
-
-    logger.fatal(msg);
-    throw new QueryException( // @todo what is a better exception to throw here?
-        msg);
-    
-    /* alternative strategy: re-query
-    queryBuckets();
-    */
-    
-  }
-  
-  private InternalDistributedMember findNodeForBucket(Integer bucketId) {
-    for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> itr = this.node2bucketIds.entrySet().iterator(); itr.hasNext(); ) {
-      Map.Entry<InternalDistributedMember,List<Integer>> entry = itr.next();
-      List<Integer> blist = entry.getValue();
-      for (Iterator<Integer> itr2 = blist.iterator(); itr2.hasNext(); ) {
-        Integer bid = itr2.next();
-        if (bid.equals(bucketId)) {
-          return (InternalDistributedMember)entry.getKey();
-        }
-      }
-    }
-    String msg = "Unable to get node for bucket id " + bucketId + " node to bucket map is " + this.node2bucketIds;
-    logger.fatal(msg);
-    throw new InternalGemFireError(msg);
   }
 
   /**