You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:33 UTC

[15/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
index aeb4343,0000000..0227836
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
@@@ -1,580 -1,0 +1,576 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.query.dunit;
 +
 +import java.util.Properties;
 +
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.PartitionResolver;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.client.ClientCache;
 +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 +import com.gemstone.gemfire.cache.query.Index;
 +import com.gemstone.gemfire.cache.query.IndexType;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.data.Portfolio;
 +import com.gemstone.gemfire.cache.query.data.Position;
 +import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
 +import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
 +import com.gemstone.gemfire.cache.query.partitioned.PRQueryDUnitHelper;
 +import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
 +import com.gemstone.gemfire.cache30.CacheTestCase;
 +import com.gemstone.gemfire.internal.cache.execute.PRClientServerTestBase;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.ThreadUtils;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +
 +/**
 + * This tests the data inconsistency during update on an index and querying the
 + * same UNLOCKED index.
 + * 
 + * @author shobhit
 + * 
 + */
 +public class QueryDataInconsistencyDUnitTest extends CacheTestCase {
 +
 +  private static final int cnt = 0;
 +
 +  private static final int cntDest = 10;
 +
 +  static VM server = null;
 +
 +  static VM client = null;
 +
 +  static Cache cache = null;
 +
 +  static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
 +                                                                   // name
 +  static String repRegionName = "TestRepRegion"; // default name
 +  
 +  static Integer serverPort1 = null;
 +
 +  public static int numOfBuckets = 20;
 +
 +  public static String[] queries = new String[] {
 +      "select * from /" + PartitionedRegionName1 + " where ID=1",
 +      };
 +
 +  public static String[] queriesForRR = new String[] { "<trace> select * from /"
 +      + repRegionName + " where ID=1" };
 +
 +  private static PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper("");
 +
 +  public static volatile boolean hooked = false;
 +  /**
 +   * @param name
 +   */
 +  public QueryDataInconsistencyDUnitTest(String name) {
 +    super(name);
 +  }
 +
 +  @Override
 +  protected final void preTearDownCacheTestCase() throws Exception {
 +    Invoke.invokeInEveryVM(CacheTestCase.class, "disconnectFromDS");
 +  }
 +  
 +  @Override
 +  protected final void postTearDownCacheTestCase() throws Exception {
 +    Invoke.invokeInEveryVM(QueryObserverHolder.class, "reset");
 +  }
 +
 +  @Override
 +  public void setUp() throws Exception {
 +    super.setUp();
 +    Host host = Host.getHost(0);
 +    server = host.getVM(0);
 +  }
 +
 +  public void testCompactRangeIndex() {
 +    // Create caches
 +    Properties props = new Properties();
-     server.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
++    server.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
 +
 +    server.invoke(new CacheSerializableRunnable("create indexes") {
 +
 +      @Override
 +      public void run2() throws CacheException {
 +        cache = CacheFactory.getAnyInstance();
 +        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 +        
 +        // Create common Portflios and NewPortfolios
 +        for (int j = cnt; j < cntDest; j++) {
 +          region.put(new Integer(j), new Portfolio(j));
 +        }
 +        
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        try {
 +          Index index = qs.createIndex("idIndex", "ID", "/"+repRegionName);
 +          assertEquals(10, index.getStatistics().getNumberOfKeys());
 +        } catch (Exception e) {
 +          fail("Index creation failed");
 +        }
 +      }
 +    });
 +    //Invoke update from client and stop in updateIndex
 +    //first before updating the RegionEntry and second after updating
 +    //the RegionEntry.
 +    AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
 +        IndexManager.testHook = new IndexManagerTestHook();
 +        repRegion.put(new Integer("1"), new Portfolio(cntDest+1));
 +        //above call must be hooked in BEFORE_UPDATE_OP call.
 +      }
 +    });
 +    server.invoke(new CacheSerializableRunnable("query on server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        while (!hooked){Wait.pause(100);}
 +        Object rs = null;
 +        try {
 +          rs = qs.newQuery("<trace> select * from /"+repRegionName+" where ID = 1").execute();          
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          fail("Query execution failed on server.");
 +          IndexManager.testHook = null;
 +        }
 +        assertTrue(rs instanceof SelectResults);
 +        assertEquals(1, ((SelectResults)rs).size());
 +        Portfolio p1 = (Portfolio) ((SelectResults)rs).asList().get(0);
 +        if (p1.getID() != 1) {
 +          fail("Query thread did not verify index results even when RE is under update");
 +          IndexManager.testHook = null;
 +        }
 +        hooked = false;//Let client put go further.
 +      }
 +    });
 +
 +    //Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
 +    server.invoke(new CacheSerializableRunnable("query on server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        while (!hooked){Wait.pause(100);}
 +        Object rs = null;
 +        try {
 +          rs = qs.newQuery("<trace> select * from /"+repRegionName+" where ID = 1").execute();          
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          fail("Query execution failed on server."+e.getMessage());
 +        } finally {
 +          IndexManager.testHook = null;
 +        }
 +        assertTrue(rs instanceof SelectResults);
 +        if (((SelectResults)rs).size() > 0) {
 +          Portfolio p1 = (Portfolio) ((SelectResults)rs).iterator().next();
 +          if (p1.getID() != 1) {
 +            fail("Query thread did not verify index results even when RE is under update and " +
 +                      "RegionEntry value has been modified before releasing the lock");
 +            IndexManager.testHook = null;
 +          }
 +        }
 +        hooked = false;//Let client put go further.
 +      }
 +    });
 +    ThreadUtils.join(putThread, 200);
 +  }
 +
 +  public void testRangeIndex() {
 +    // Create caches
 +    Properties props = new Properties();
-     server.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
++    server.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
 +
 +    server.invoke(new CacheSerializableRunnable("create indexes") {
 +
 +      @Override
 +      public void run2() throws CacheException {
 +        cache = CacheFactory.getAnyInstance();
 +        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 +        IndexManager.testHook = null;
 +        // Create common Portfolios and NewPortfolios
 +        Position.cnt = 0;
 +        for (int j = cnt; j < cntDest; j++) {
 +          Portfolio p = new Portfolio(j);
 +          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio "+j+ " : "+p);
 +          region.put(new Integer(j), p);
 +        }
 +        
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        try {
 +          Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.positions.values pos");
 +          assertEquals(12, index.getStatistics().getNumberOfKeys());
 +        } catch (Exception e) {
 +          fail("Index creation failed");
 +        }
 +      }
 +    });
 +    //Invoke update from client and stop in updateIndex
 +    //first before updating the RegionEntry and second after updating
 +    //the RegionEntry.
 +    AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
 +        IndexManager.testHook = new IndexManagerTestHook();
 +        Portfolio newPort = new Portfolio(cntDest+1);
 +        CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio"+newPort);
 +        repRegion.put(new Integer("1"), newPort);
 +        //above call must be hooked in BEFORE_UPDATE_OP call.
 +      }
 +    });
 +
 +    server.invoke(new CacheSerializableRunnable("query on server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        Position pos1 = null;
 +        while (!hooked){Wait.pause(100);}
 +        try {
 +          Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
 +          assertTrue(rs instanceof SelectResults);
 +          pos1 = (Position) ((SelectResults)rs).iterator().next();
 +          if (!pos1.secId.equals("APPL")) {
 +            fail("Query thread did not verify index results even when RE is under update");
 +            IndexManager.testHook = null;       
 +          }          
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          Assert.fail("Query execution failed on server.", e);
 +          IndexManager.testHook = null;
 +        } finally {
 +          hooked = false;//Let client put go further.
 +        }
 +        while (!hooked) {
 +          Wait.pause(100);
 +        }
 +        try {
 +          Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
 +          assertTrue(rs instanceof SelectResults);
 +          if (((SelectResults)rs).size() > 0) {
 +            Position pos2 = (Position) ((SelectResults)rs).iterator().next();
 +            if (pos2.equals(pos1)) {
 +              fail("Query thread did not verify index results even when RE is under update and " +
 +              "RegionEntry value has been modified before releasing the lock");
 +            }
 +          }
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          fail("Query execution failed on server.");
 +        } finally {
 +          hooked = false;//Let client put go further.
 +          IndexManager.testHook = null;
 +        }
 +      }
 +    });
 +    ThreadUtils.join(putThread, 200);
 +  }
 +  
 +  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() {
 +    // Create caches
 +    Properties props = new Properties();
-     server.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
++    server.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
 +
 +    server.invoke(new CacheSerializableRunnable("create indexes") {
 +
 +      @Override
 +      public void run2() throws CacheException {
 +        cache = CacheFactory.getAnyInstance();
 +        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 +        IndexManager.testHook = null;
 +        // Create common Portfolios and NewPortfolios
 +        Position.cnt = 0;
 +        for (int j = cnt; j < cntDest; j++) {
 +          region.put(new Integer(j), new Portfolio(j));
 +        }
 +        
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        try {
 +          Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos");
 +          assertEquals(12, index.getStatistics().getNumberOfKeys());
 +        } catch (Exception e) {
 +          fail("Index creation failed");
 +        }
 +      }
 +    });
 +    //Invoke update from client and stop in updateIndex
 +    //first before updating the RegionEntry and second after updating
 +    //the RegionEntry.
 +    AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
 +        IndexManager.testHook = new IndexManagerTestHook();
 +        // This portfolio with same ID must have different positions.
 +        repRegion.put(new Integer("1"), new Portfolio(1));
 +        //above call must be hooked in BEFORE_UPDATE_OP call.
 +      }
 +    });
 +
 +    server.invoke(new CacheSerializableRunnable("query on server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        Position pos1 = null;
 +        while (!hooked){Wait.pause(100);}
 +        try {
 +          Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
 +          assertTrue(rs instanceof SelectResults);
 +          pos1 = (Position) ((SelectResults)rs).iterator().next();
 +          if (!pos1.secId.equals("APPL")) {
 +            fail("Query thread did not verify index results even when RE is under update");
 +            IndexManager.testHook = null;       
 +          }          
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          Assert.fail("Query execution failed on server.", e);
 +          IndexManager.testHook = null;
 +        } finally {
 +          hooked = false;//Let client put go further.
 +        }
 +        while (!hooked) {
 +          Wait.pause(100);
 +        }
 +        try {
 +          Object rs = qs.newQuery("select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          assertTrue(rs instanceof SelectResults);
 +          if (((SelectResults)rs).size() > 0) {
 +            Position pos2 = (Position) ((SelectResults)rs).iterator().next();
 +            if (pos2.equals(pos1)) {
 +              fail("Query thread did not verify index results even when RE is under update and " +
 +              "RegionEntry value has been modified before releasing the lock");
 +            }
 +          }
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          fail("Query execution failed on server.");
 +        } finally {
 +          hooked = false;//Let client put go further.
 +          IndexManager.testHook = null;
 +        }
 +      }
 +    });
 +    ThreadUtils.join(putThread, 200);
 +  }
 +
 +  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
 +    // Create caches
 +    Properties props = new Properties();
-     server.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
++    server.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
 +
 +    server.invoke(new CacheSerializableRunnable("create indexes") {
 +
 +      @Override
 +      public void run2() throws CacheException {
 +        cache = CacheFactory.getAnyInstance();
 +        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 +        IndexManager.testHook = null;
 +        // Create common Portfolios and NewPortfolios
 +        Position.cnt = 0;
 +        for (int j = cnt; j < cntDest; j++) {
 +          region.put(new Integer(j), new Portfolio(j));
 +        }
 +        
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        try {
 +          Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.positions.values pos");
 +          assertEquals(12, index.getStatistics().getNumberOfKeys());
 +        } catch (Exception e) {
 +          fail("Index creation failed");
 +        }
 +      }
 +    });
 +    //Invoke update from client and stop in updateIndex
 +    //first before updating the RegionEntry and second after updating
 +    //the RegionEntry.
 +    AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
 +        IndexManager.testHook = new IndexManagerTestHook();
 +        // This portfolio with same ID must have different positions.
 +        repRegion.put(new Integer("1"), new Portfolio(1));
 +        //above call must be hooked in BEFORE_UPDATE_OP call.
 +      }
 +    });
 +
 +    server.invoke(new CacheSerializableRunnable("query on server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +        Position pos1 = null;
 +        while (!hooked){Wait.pause(100);}
 +        try {
 +          Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
 +          assertTrue(rs instanceof SelectResults);
 +          pos1 = (Position) ((SelectResults)rs).iterator().next();
 +          if (!pos1.secId.equals("APPL")) {
 +            fail("Query thread did not verify index results even when RE is under update");
 +            IndexManager.testHook = null;       
 +          }          
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          Assert.fail("Query execution failed on server.", e);
 +          IndexManager.testHook = null;
 +        } finally {
 +          hooked = false;//Let client put go further.
 +        }
 +        while (!hooked) {
 +          Wait.pause(100);
 +        }
 +        try {
 +          Object rs = qs.newQuery("select pos from /"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
 +          assertTrue(rs instanceof SelectResults);
 +          if (((SelectResults)rs).size() > 0) {
 +            Position pos2 = (Position) ((SelectResults)rs).iterator().next();
 +            if (pos2.equals(pos1)) {
 +              fail("Query thread did not verify index results even when RE is under update and " +
 +              "RegionEntry value has been modified before releasing the lock");
 +            }
 +          }
 +        } catch (Exception e) {
 +          e.printStackTrace();
 +          fail("Query execution failed on server.");
 +        } finally {
 +          IndexManager.testHook = null;
 +          hooked = false;//Let client put go further.
 +        }
 +      }
 +    });
 +    ThreadUtils.join(putThread, 200);
 +  }
 +  
 +  public static void createProxyRegions() {
 +    new QueryDataInconsistencyDUnitTest("temp").createProxyRegs();
 +  }
 +
 +  private void createProxyRegs() {
 +    ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(
 +        repRegionName);
 +
 +    /*cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(
 +        PartitionedRegionName1);*/
 +  }
 +
 +  public static void createNewPR() {
 +    new QueryDataInconsistencyDUnitTest("temp").createPR();
 +  }
 +  public void createPR() {
 +    PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
 +    cache = CacheFactory.getAnyInstance();
 +    cache
 +        .createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
 +        .setPartitionAttributes(
 +            new PartitionAttributesFactory()
 +                .setTotalNumBuckets(numOfBuckets)
 +                .setPartitionResolver(testKeyBasedResolver)
 +                .create())
 +        .create(PartitionedRegionName1);
 +  }
 +
 +  public static void createCacheClientWithoutRegion(String host, Integer port1) {
 +    new QueryDataInconsistencyDUnitTest("temp").createCacheClientWithoutReg(
 +        host, port1);
 +  }
 +
 +  private void createCacheClientWithoutReg(String host, Integer port1) {
 +    this.disconnectFromDS();
 +    ClientCache cache = new ClientCacheFactory().addPoolServer(host, port1)
 +        .create();
 +  }
 +
 +  /**
 +   * This function puts portfolio objects into the created Region (PR or Local)
 +   * *
 +   * 
 +   * @param regionName
 +   * @param portfolio
 +   * @param to
 +   * @param from
 +   * @return cacheSerializable object
 +   */
 +  public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(
 +      final String regionName, final Object[] portfolio, final int from,
 +      final int to) {
 +    SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
 +      @Override
 +      public void run2() throws CacheException {
 +        Cache cache = CacheFactory.getAnyInstance();
 +        Region region = cache.getRegion(repRegionName);
 +        for (int j = from; j < to; j++)
 +          region.put(new Integer(j), portfolio[j]);
 +          LogWriterUtils.getLogWriter()
 +            .info(
 +                "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region "
 +                    + regionName);
 +      }
 +    };
 +    return (CacheSerializableRunnable) puts;
 +  }
 +
 +  public class IndexManagerTestHook implements com.gemstone.gemfire.cache.query.internal.index.IndexManager.TestHook{
 +    public void hook(final int spot) throws RuntimeException {
 +      switch (spot) {
 +      case 9: //Before Index update and after region entry lock.
 +        hooked  = true;
 +        LogWriterUtils.getLogWriter().info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry.");
 +        while(hooked) {
 +          Wait.pause(100);
 +        }
 +        assertEquals(hooked, false);
 +        break;
 +      case 10: //Before Region update and after Index Remove call.
 +        hooked  = true;
 +        LogWriterUtils.getLogWriter().info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry.");
 +        while(hooked) {
 +          Wait.pause(100);
 +        }
 +        assertEquals(hooked, false);
 +        break;
 +      default:
 +        break;
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
index fcc96dc,0000000..1e3e973
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryUsingFunctionContextDUnitTest.java
@@@ -1,1048 -1,0 +1,1041 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache.query.dunit;
 +
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Properties;
 +import java.util.Set;
 +
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.PartitionResolver;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.client.ClientCache;
 +import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 +import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 +import com.gemstone.gemfire.cache.client.ServerConnectivityException;
 +import com.gemstone.gemfire.cache.execute.Function;
 +import com.gemstone.gemfire.cache.execute.FunctionAdapter;
 +import com.gemstone.gemfire.cache.execute.FunctionContext;
 +import com.gemstone.gemfire.cache.execute.FunctionException;
 +import com.gemstone.gemfire.cache.execute.FunctionService;
 +import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
 +import com.gemstone.gemfire.cache.execute.ResultCollector;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.data.Portfolio;
 +import com.gemstone.gemfire.cache.query.functional.StructSetOrResultsSet;
 +import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 +import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver;
 +import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
 +import com.gemstone.gemfire.cache.query.partitioned.PRQueryDUnitHelper;
 +import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
 +import com.gemstone.gemfire.cache30.CacheTestCase;
 +import com.gemstone.gemfire.internal.cache.LocalDataSet;
 +import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 +import com.gemstone.gemfire.internal.cache.execute.PRClientServerTestBase;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +
 +/**
 + * This tests the querying using a RegionFunctionContext which provides a filter
 + * (routing keys) to run the query on subset of buckets "locally". If query
 + * includes buckets
 + *
 + * @author shobhit
 + *
 + */
 +public class QueryUsingFunctionContextDUnitTest extends CacheTestCase {
 +
 +  private static final int cnt = 0;
 +
 +  private static final int cntDest = 100;
 +
-   static VM server1 = null;
++  VM server1 = null;
 +
 +  static VM server2 = null;
 +
 +  static VM server3 = null;
 +
 +  static VM client = null;
 +
 +  static Cache cache = null;
 +
 +  static Function function = null;
 +  // PR 2 is co-located with 1 and 3 is co-located with 2
 +  // PR 5 is co-located with 4
 +  static String PartitionedRegionName1 = "TestPartitionedRegion1"; //default name
 +  static String PartitionedRegionName2 = "TestPartitionedRegion2"; //default name
 +  static String PartitionedRegionName3 = "TestPartitionedRegion3"; //default name
 +  static String PartitionedRegionName4 = "TestPartitionedRegion4"; //default name
 +  static String PartitionedRegionName5 = "TestPartitionedRegion5"; //default name
 +
 +  static String repRegionName = "TestRepRegion"; //default name
 +  static String localRegionName = "TestLocalRegion"; //default name
 +
 +  static Integer serverPort1 = null;
 +
 +  static Integer serverPort2 = null;
 +
 +  static Integer serverPort3 = null;
 +
 +  public static int numOfBuckets = 20;
 +
 +  public static String[] queries = new String[] {
 +      "select * from /" + PartitionedRegionName1 + " where ID>=0",
 +      "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName2 + " r2 where r1.ID = r2.ID",
 +      "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName2 + " r2 where r1.ID = r2.ID AND r1.status = r2.status",
 +      "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName2 + " r2, /" + PartitionedRegionName3 + " r3 where r1.ID = r2.ID and r2.ID = r3.ID",
 +      "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName2 + " r2, /" + PartitionedRegionName3
 +          + " r3  , /" + repRegionName + " r4 where r1.ID = r2.ID and r2.ID = r3.ID and r3.ID = r4.ID",
 +     // "Select * from /" + PartitionedRegionName4 + " r4 , /" + PartitionedRegionName5 + " r5 where r4.ID = r5.ID"
 +      };
 +
 +  public static String[] nonColocatedQueries = new String[]{
 +    "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName4 + " r4 where r1.ID = r4.ID",
 +    "Select * from /" + PartitionedRegionName1 + " r1, /" + PartitionedRegionName4 + " r4 , /" + PartitionedRegionName5 + " r5 where r1.ID = r4.ID and r4.ID = r5.ID"
 +  };
 +  
 +  public static String[] queriesForRR = new String[]{"<trace> select * from /"+repRegionName+" where ID>=0"};
 +
 +  private static PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper("");
 +  /**
 +   * @param name
 +   */
 +  public QueryUsingFunctionContextDUnitTest(String name) {
 +    super(name);
 +
 +  }
 +
 +  @Override
 +  protected final void preTearDownCacheTestCase() throws Exception {
 +    Invoke.invokeInEveryVM(CacheTestCase.class, "disconnectFromDS");
 +  }
 +  
 +  @Override
 +  protected final void postTearDownCacheTestCase() throws Exception {
 +    Invoke.invokeInEveryVM(QueryObserverHolder.class, "reset");
 +    cache = null;
 +    Invoke.invokeInEveryVM(new SerializableRunnable() { public void run() { cache = null; } });
 +  }
 +
 +  @Override
 +  public void setUp() throws Exception {
 +    super.setUp();
 +    Host host = Host.getHost(0);
 +    server1 = host.getVM(0);
 +    server2 = host.getVM(1);
 +    server3 = host.getVM(2);
 +    client = host.getVM(3);
 +    createServersWithRegions();
 +    fillValuesInRegions();
 +    registerFunctionOnServers();
 +  }
 +
 +
 +  /**
 +   * Test on Replicated Region.
 +   */
 +  public void testQueriesWithFilterKeysOnReplicatedRegion() {
 +    IgnoredException.addIgnoredException("IllegalArgumentException");
 +
 +    Object[][] r = new Object[queriesForRR.length][2];
 +
 +    client.invoke(new CacheSerializableRunnable("Run function on RR") {
 +      @Override
 +      public void run2() throws CacheException {
 +
 +        ResultCollector rcollector = null;
 +
 +        for (int i = 0; i < queriesForRR.length; i++) {
 +
 +          try {
 +            function = new TestQueryFunction("queryFunctionOnRR");
 +
 +            rcollector = FunctionService
 +                .onRegion(
 +                    CacheFactory.getAnyInstance().getRegion(repRegionName))
 +                .withArgs(queriesForRR[i]).execute(function);
 +
 +            //Should not come here, an exception is expected from above function call.
 +            fail("Function call did not fail for query with function context");
 +          } catch (FunctionException ex) {
 +            if (!(ex.getCause() instanceof IllegalArgumentException)) {
 +              fail("Should have received an IllegalArgumentException");
 +            }
 +          }
 +        }//For loop ends here.
 +      }
 +    });
 +  }
 +
 +  /**
 +   * Test on PR on one server only using filter.
 +   */
 +  public void testQueriesWithFilterKeysOnPRLocal() {
 +    
 +    client.invoke(new CacheSerializableRunnable("Test query on client and server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Set filter =  new HashSet();
 +        filter.add(0);
 +
 +        for (int i=0; i< queries.length; i++) {
 +          Object[][] r = new Object[1][2];
 +
 +          TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1");
 +          function = new TestQueryFunction("queryFunction-1");
 +          QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +          ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]);  
 +          if (queryResults2 == null)
 +            fail(queries[i] +"result is null from client function");
 +          
 +          ArrayList queryResults1 = test.runLDSQueryOnClientUsingFunc(func, filter, queries[i]);
 +          if (queryResults1 == null)
 +            fail(queries[i] +"result is null from LDS function");
 +          
 +          r[0][0] = queryResults1;
 +          r[0][1] = queryResults2;
 +          StructSetOrResultsSet ssORrs = new  StructSetOrResultsSet();
 +          ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(r, 1,false, new String[]{queries[i]});
 +        }   
 +      }
 +    });
 +    
 +  }
 +
 +  public void testInvalidQueries() {
 +    
 +    IgnoredException.addIgnoredException("Syntax error");
 +    client.invoke(new CacheSerializableRunnable("Test query on client and server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Set filter =  new HashSet();
 +        filter.add(0);
 +        String query = "select * from / " + repRegionName + " where ID>=0";
 +        TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1");
 +        function = new TestQueryFunction("queryFunction-1");
 +        QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +        try {
 +          test.runQueryOnClientUsingFunc(function, repRegionName, filter, query);  
 +          fail("Query execution should have failed.");
 +        } catch (FunctionException ex) {
 +          assertTrue("The exception message should mention QueryInvalidException. ", ex.getLocalizedMessage().contains("QueryInvalidException"));
 +        }
 +        
 +        query = "select * from / " + PartitionedRegionName1 + " where ID>=0";
 +        func = new TestServerQueryFunction("LDS Server function-1");
 +        function = new TestQueryFunction("queryFunction-1");
 +        test = new QueryUsingFunctionContextDUnitTest("test");
 +        try {
 +          test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, query);  
 +          fail("Query execution should have failed.");
 +        } catch (FunctionException ex) {
 +          assertTrue("The exception message should mention QueryInvalidException. ", ex.getLocalizedMessage().contains("QueryInvalidException"));
 +        }
 +      }
 +    });
 +    
 +  }
 +  
 +  /**
 +   *
 +   */
 +  public void testQueriesWithFilterKeysOnPRLocalAndRemote() {
 +    
 +    client.invoke(new CacheSerializableRunnable("Test query on client and server") {
 +      
 +      @Override
 +      public void run2() throws CacheException {
 +        Set filter = getFilter(0, 1);
 +
 +        TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-1");
 +        function = new TestQueryFunction("queryFunction-2");
 +
 +        for (int i=0; i< queries.length; i++) {
 +          Object[][] r = new Object[1][2];
 +
 +          QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +          ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]);
 +          if (queryResults2 == null)
 +            fail(queries[i] +"result is null from client function");
 +          ArrayList queryResults1 = test.runLDSQueryOnClientUsingFunc(func, filter, queries[i]);
 +          if (queryResults1 == null)
 +            fail(queries[i] +"result is null from LDS function");
 +          
 +
 +          r[0][0] = queryResults1;
 +          r[0][1] = queryResults2;
 +          StructSetOrResultsSet ssORrs = new  StructSetOrResultsSet();
 +          ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(r, 1,false, new String[]{queries[i]});
 +        }    
 +      }
 +    });
 +  }
 +
 +  /**
 +  *
 +  */
 + public void testQueriesWithFilterKeysOnPRLocalAndRemoteWithBucketDestroy() {
 +   
 +   // Set Query Observer in cache on server1
 +   server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") {
 +     @Override
 +     public void run2() throws CacheException {
 +
 +       class MyQueryObserver extends IndexTrackingQueryObserver {
 +
 +         @Override
 +         public void startQuery(Query query) {
 +           //Destroy only for first query.
 +           if (query.getQueryString().contains("ID>=0")){
 +             Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1);
 +             Region KeyRegion = null;
 +             for (int i=3; i<7; i++) {
 +               KeyRegion = ((PartitionedRegion)pr).getBucketRegion(i/*key*/);
 +               if (KeyRegion != null)
 +               KeyRegion.destroyRegion();
 +             }
 +           }
 +         }
 +       };
 +
 +       QueryObserverHolder.setInstance(new MyQueryObserver());
 +     }
 +   });
 +   
 +   client.invoke(new CacheSerializableRunnable("Test query on client and server") {
 +     
 +     @Override
 +     public void run2() throws CacheException {
 +       Set filter = getFilter(0, 2);
 +       
 +       TestServerQueryFunction func = new TestServerQueryFunction("LDS Server function-2");
 +       function = new TestQueryFunction("queryFunction");
 +
 +       for (int i=0; i< queries.length; i++) {
 +         
 +         QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +         ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]);  
 +         
 +         // The Partition Region has 20 buckets with 100 values and key i goes in bucket j=(i%20)
 +         // So each bucket has 5 keys so for 3 buckets 3*5.
 +         if (i==0 && queryResults2.size() != 3*5) {
 +           fail("Result size should have been 15 but was"+queryResults2.size());
 +         }
 +       }    
 +     }
 +   });
 +   // Reset Query Observer in cache on server1
 +   server1.invoke(new CacheSerializableRunnable("Reset Query Observer on server1") {
 +     @Override
 +     public void run2() throws CacheException {
 +       QueryObserverHolder.reset();
 +     }
 +   });
 +
 + }
 +
 +  /**
 +   *
 +   */
 +  public void testQueriesWithFilterKeysOnPRWithBucketDestroy() {
 +    IgnoredException.addIgnoredException("QueryInvocationTargetException");
 +    Object[][] r = new Object[queries.length][2];
 +    Set filter =  new HashSet();
 +
 +    // Close cache on server1
 +    server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") {
 +      @Override
 +      public void run2() throws CacheException {
 +
 +        class MyQueryObserver extends IndexTrackingQueryObserver {
 +
 +          @Override
 +          public void startQuery(Query query) {
 +            Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1);
 +            Region KeyRegion = null;
 +            for (int i=0; i<7; i++) {
 +              KeyRegion = ((PartitionedRegion)pr).getBucketRegion(i/*key*/);
 +              if (KeyRegion != null)
 +              KeyRegion.destroyRegion();
 +            }
 +          }
 +        };
 +
 +        QueryObserverHolder.setInstance(new MyQueryObserver());
 +      }
 +    });
 +
 +    client.invoke(new CacheSerializableRunnable("Run function on PR") {
 +      @Override
 +      public void run2() throws CacheException {
 +        Set filter =  new HashSet();
 +        ResultCollector rcollector = null;
 +        filter.addAll(getFilter(0, 19));
 +
 +        for (int i = 0; i < queries.length; i++) {
 +
 +          try {
 +            function = new TestQueryFunction("queryFunctionBucketDestroy");
 +
 +            rcollector = FunctionService
 +                .onRegion(
 +                    CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1))
 +                .withArgs(queries[i])
 +                .withFilter(filter)
 +                .execute(function);
 +
 +            //Should not come here, an exception is expected from above function call.
 +            fail("Function call did not fail for query with function context");
 +          } catch (FunctionException ex) {
 +            //ex.printStackTrace();
 +            if (!(ex.getCause() instanceof QueryInvocationTargetException)) {
 +              fail("Should have received an QueryInvocationTargetException but recieved"+ex.getMessage());
 +            }
 +          }
 +        }//For loop ends here.
 +      }
 +    });
 +
 + // Close cache on server1
 +    server1.invoke(new CacheSerializableRunnable("Reset Query Observer on server1") {
 +      @Override
 +      public void run2() throws CacheException {
 +        QueryObserverHolder.reset();
 +      }
 +    });
 +
 +  }
 +
 +  /**
 +  *
 +  */
 + public void testQueriesWithFilterKeysOnPRWithRebalancing() {
 +   IgnoredException.addIgnoredException("QueryInvocationTargetException");
 +   IgnoredException.addIgnoredException("java.net.SocketException");
 +   IgnoredException.addIgnoredException("ServerConnectivityException");
 +   IgnoredException.addIgnoredException("FunctionException");
 +   IgnoredException.addIgnoredException("IOException");
 +
 +   // Close cache on server1
 +   server1.invoke(new CacheSerializableRunnable("Set QueryObserver in cache on server1") {
 +     @Override
 +     public void run2() throws CacheException {
 +
 +       class MyQueryObserver extends IndexTrackingQueryObserver {
 +
 +         @Override
 +         public void startQuery(Query query) {
 +           Region pr = CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1);
 +           Region KeyRegion = null;
 +           for (int i=6; i<9; i++) {
 +             KeyRegion = ((PartitionedRegion)pr).getBucketRegion(i/*key*/);
 +             if (KeyRegion != null)
 +             KeyRegion.destroyRegion();
 +           }
 +         }
 +       };
 +
 +       QueryObserverHolder.setInstance(new MyQueryObserver());
 +     }
 +   });
 +
 +   client.invoke(new CacheSerializableRunnable("Run function on PR") {
 +     @Override
 +     public void run2() throws CacheException {
 +       Set filter =  new HashSet();
 +       ResultCollector rcollector = null;
 +       filter.addAll(getFilter(6, 9));
 +
 +       for (int i = 0; i < queries.length; i++) {
 +
 +         try {
 +           function = new TestQueryFunction("queryFunction");
 +
 +           rcollector = FunctionService
 +               .onRegion(
 +                   CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1))
 +               .withArgs(queries[i])
 +               .withFilter(filter)
 +               .execute(function);
 +
 +           //Should not come here, an exception is expected from above function call.
 +           fail("Function call did not fail for query with function context");
 +         } catch (FunctionException ex) {
 +           if (!((ex.getCause() instanceof QueryInvocationTargetException) || (ex.getCause() instanceof ServerConnectivityException))) {
 +             if (ex.getCause() instanceof FunctionException) {
 +               FunctionException fe = (FunctionException)ex.getCause();
 +               if (!fe.getMessage().startsWith("IOException")) {
 +                 fail("Should have received an QueryInvocationTargetException but recieved"+ex.getMessage());
 +               }
 +             }
 +             else {
 +               fail("Should have received an QueryInvocationTargetException but recieved"+ex.getMessage());
 +             }
 +           }
 +         }
 +       }//For loop ends here.
 +     }
 +   });
 +
 +// Close cache on server1
 +   server1.invoke(new CacheSerializableRunnable("Reset Query Observer on server1") {
 +     @Override
 +     public void run2() throws CacheException {
 +       QueryObserverHolder.reset();
 +     }
 +   });
 +
 + }
 +
 + 
 + public void testNonColocatedRegionQueries() {
 +   IgnoredException.addIgnoredException("UnsupportedOperationException");
 +   client.invoke(new CacheSerializableRunnable("Test query on non-colocated regions on server") {
 +     @Override
 +     public void run2() throws CacheException {
 +       Set filter =  new HashSet();
 +       filter.add(0);
 +
 +       for (int i=0; i< nonColocatedQueries.length; i++) {
 +         function = new TestQueryFunction("queryFunction-1");
 +         QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +         try {
 +          ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, nonColocatedQueries[i]);
 +          fail("Function call did not fail for query with function context");
 +         } catch (FunctionException e) {
 +          if (!(e.getCause() instanceof UnsupportedOperationException)) {
 +            Assert.fail("Should have received an UnsupportedOperationException but received", e);
 +          }
 +         }  
 +        }   
 +     }
 +   });
 +   
 + }
 + 
 + public void testJoinQueryPRWithMultipleIndexes(){
 +   
 +   server1.invoke(new CacheSerializableRunnable("Test query with indexes") {
 +     
 +     @Override
 +     public void run2() throws CacheException {
 +       Set filter = getFilter(0, 1);
 +       function = new TestQueryFunction("queryFunction-2");
 +       Object[][] r = new Object[2][2];
 +       QueryUsingFunctionContextDUnitTest test = new QueryUsingFunctionContextDUnitTest("test");
 +       int j = 0;
 +       for (int i=3; i< 5; i++) {
 +         ArrayList queryResults2 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]);
 +         if (queryResults2 == null)
 +           fail(queries[i] +"result is null from client function");
 +         r[j++][1] = queryResults2;
 +       }
 +       createIndex();
 +       j = 0;
 +       for (int i=3; i< 5; i++) {
 +         ArrayList queryResults1 = test.runQueryOnClientUsingFunc(function, PartitionedRegionName1, filter, queries[i]);
 +         if (queryResults1 == null)
 +           fail(queries[i] +"result is null from client function");
 +         r[j++][0] = queryResults1;
 +       }
 +       
 +       StructSetOrResultsSet ssORrs = new  StructSetOrResultsSet();
 +       ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(r, 2,false, queries);
 +
 +     }
 +   });
 + }
 + 
 +
 +  //Helper classes and function
 +  public class TestQueryFunction extends FunctionAdapter {
 +
 +    @Override
 +    public boolean hasResult() {
 +      return true;
 +    }
 +
 +    @Override
 +    public boolean isHA() {
 +      return false;
 +    }
 +
 +    private final String id;
 +
 +
 +    public TestQueryFunction(String id) {
 +      super();
 +      this.id = id;
 +    }
 +
 +    @Override
 +    public void execute(FunctionContext context) {
 +      Cache cache = CacheFactory.getAnyInstance();
 +      QueryService queryService = cache.getQueryService();
 +      ArrayList allQueryResults = new ArrayList();
 +      String qstr = (String) context.getArguments();
 +      try {
 +        Query query = queryService.newQuery(qstr);
 +        context.getResultSender().lastResult((ArrayList) ((SelectResults) query
 +                .execute((RegionFunctionContext) context)).asList());
 +      } catch (Exception e) {
 +        throw new FunctionException(e);
 +      }
 +    }
 +
 +    @Override
 +    public String getId() {
 +      return this.id;
 +    }
 +  }
 +  
 +  public class TestServerQueryFunction extends FunctionAdapter {
 +
 +    @Override
 +    public boolean hasResult() {
 +      return true;
 +    }
 +
 +    @Override
 +    public boolean isHA() {
 +      return false;
 +    }
 +
 +    private final String id;
 +
 +
 +    public TestServerQueryFunction(String id) {
 +      super();
 +      this.id = id;
 +    }
 +
 +    @Override
 +    public void execute(FunctionContext context) {
 +      Cache cache = CacheFactory.getAnyInstance();
 +      QueryService queryService = cache.getQueryService();
 +      ArrayList allQueryResults = new ArrayList();
 +      Object[] args = (Object[])context.getArguments();
 +      Set buckets = getBucketsForFilter((Set)args[1]);
 +      Region localDataSet = new LocalDataSet((PartitionedRegion)CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1), buckets);
 +
 +      try {
 +        Query query = queryService.newQuery((String)args[0]);
 +        context.getResultSender().lastResult((ArrayList) ((SelectResults) ((LocalDataSet) localDataSet).executeQuery(
 +            (DefaultQuery) query, null, buckets)).asList());
 +      } catch (Exception e) {
 +        throw new FunctionException(e);
 +      }
 +    }
 +
 +    @Override
 +    public String getId() {
 +      return this.id;
 +    }
 +    
 +    private Set getBucketsForFilter(Set filter) {
 +      Set bucketids = new HashSet();
 +      for (Object key: filter) {
 +        int intKey = ((Integer)key).intValue();
 +        bucketids.add(intKey%numOfBuckets);
 +      }
 +      return bucketids;
 +    }
 +  }
 +
 +  public void fillValuesInRegions() {
 +    //Create common Portflios and NewPortfolios
 +    final Portfolio[] portfolio = PRQHelp.createPortfoliosAndPositions(cntDest);
 +
 +    //Fill local region
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(localRegionName,
 +        portfolio, cnt, cntDest));
 +
 +    //Fill replicated region
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(repRegionName,
 +        portfolio, cnt, cntDest));
 +
 +    //Fill Partition Region
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(PartitionedRegionName1,
 +        portfolio, cnt, cntDest));
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(PartitionedRegionName2,
 +        portfolio, cnt, cntDest));
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(PartitionedRegionName3,
 +        portfolio, cnt, cntDest));
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(PartitionedRegionName4,
 +        portfolio, cnt, cntDest));
 +    server1.invoke(getCacheSerializableRunnableForPRPuts(PartitionedRegionName5,
 +        portfolio, cnt, cntDest));
 +  }
 +
 +  private void registerFunctionOnServers() {
 +    function = new TestQueryFunction("queryFunction");
 +    server1.invoke(PRClientServerTestBase.class,
 +        "registerFunction", new Object []{function});
 +
 +    server2.invoke(PRClientServerTestBase.class,
 +        "registerFunction", new Object []{function});
 +
 +    server3.invoke(PRClientServerTestBase.class,
 +        "registerFunction", new Object []{function});
 +  }
 +
 +  private void createServersWithRegions() {
 +    //Create caches
 +    Properties props = new Properties();
-     server1.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
-     server2.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
-     server3.invoke(PRClientServerTestBase.class, "createCacheInVm",
-         new Object[] { props });
++    server1.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
++    server2.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
++    server3.invoke(() -> PRClientServerTestBase.createCacheInVm( props ));
 +
 +    //Create Cache Servers
-     Integer port1 = (Integer)server1.invoke(PRClientServerTestBase.class,
-     "createCacheServer");
-     Integer port2 = (Integer)server2.invoke(PRClientServerTestBase.class,
-         "createCacheServer");
-     Integer port3 = (Integer)server3.invoke(PRClientServerTestBase.class,
-         "createCacheServer");
++    Integer port1 = (Integer)server1.invoke(() -> PRClientServerTestBase.createCacheServer());
++    Integer port2 = (Integer)server2.invoke(() -> PRClientServerTestBase.createCacheServer());
++    Integer port3 = (Integer)server3.invoke(() -> PRClientServerTestBase.createCacheServer());
 +    serverPort1 = port1;
 +    serverPort2 = port2;
 +    serverPort3 = port3;
 +
 +    //Create client cache without regions
-     client.invoke(QueryUsingFunctionContextDUnitTest.class, "createCacheClientWithoutRegion",
-         new Object[] { NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
-             port3 });
++    client.invoke(() -> QueryUsingFunctionContextDUnitTest.createCacheClientWithoutRegion( NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
++            port3 ));
 +
 +    //Create proxy regions on client.
-     client.invoke(QueryUsingFunctionContextDUnitTest.class, "createProxyRegions");
++    client.invoke(() -> QueryUsingFunctionContextDUnitTest.createProxyRegions());
 +
 +    //Create local Region on servers
-     server1.invoke(QueryUsingFunctionContextDUnitTest.class, "createLocalRegion");
++    server1.invoke(() -> QueryUsingFunctionContextDUnitTest.createLocalRegion());
 +
 +    //Create ReplicatedRegion on servers
-     server1.invoke(QueryUsingFunctionContextDUnitTest.class, "createReplicatedRegion");
-     server2.invoke(QueryUsingFunctionContextDUnitTest.class, "createReplicatedRegion");
-     server3.invoke(QueryUsingFunctionContextDUnitTest.class, "createReplicatedRegion");
++    server1.invoke(() -> QueryUsingFunctionContextDUnitTest.createReplicatedRegion());
++    server2.invoke(() -> QueryUsingFunctionContextDUnitTest.createReplicatedRegion());
++    server3.invoke(() -> QueryUsingFunctionContextDUnitTest.createReplicatedRegion());
 +
 +    //Create two colocated PartitionedRegions On Servers.
-     server1.invoke(QueryUsingFunctionContextDUnitTest.class, "createColocatedPR");
-     server2.invoke(QueryUsingFunctionContextDUnitTest.class, "createColocatedPR");
-     server3.invoke(QueryUsingFunctionContextDUnitTest.class, "createColocatedPR");
++    server1.invoke(() -> QueryUsingFunctionContextDUnitTest.createColocatedPR());
++    server2.invoke(() -> QueryUsingFunctionContextDUnitTest.createColocatedPR());
++    server3.invoke(() -> QueryUsingFunctionContextDUnitTest.createColocatedPR());
 +
 +  }
 +
 +  public static void createProxyRegions() {
 +    new QueryUsingFunctionContextDUnitTest("temp").createProxyRegs();
 +  }
 +
 +  private void createProxyRegs() {
 +    ClientCache cache = (ClientCache)CacheFactory.getAnyInstance();
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
 +
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(localRegionName);
 +
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
 +      .create(PartitionedRegionName1);
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
 +      .create(PartitionedRegionName2);
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
 +    .create(PartitionedRegionName3);
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
 +    .create(PartitionedRegionName4);
 +    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
 +    .create(PartitionedRegionName5);
 +  }
 +
 +  public static void createLocalRegion() {
 +    new QueryUsingFunctionContextDUnitTest("temp").createLocalReg();
 +  }
 +  public void createLocalReg() {
 +    cache = CacheFactory.getAnyInstance();
 +    cache.createRegionFactory(RegionShortcut.LOCAL).create(localRegionName);
 +  }
 +
 +  public static void createReplicatedRegion() {
 +    new QueryUsingFunctionContextDUnitTest("temp").createRR();
 +  }
 +  public void createRR() {
 +    cache = CacheFactory.getAnyInstance();
 +    cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
 +  }
 +
 +  public static void createColocatedPR() {
 +    new QueryUsingFunctionContextDUnitTest("temp").createColoPR();
 +  }
 +  public void createColoPR() {
 +    PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
 +    cache = CacheFactory.getAnyInstance();
 +    cache
 +        .createRegionFactory(RegionShortcut.PARTITION)
 +        .setPartitionAttributes(
 +            new PartitionAttributesFactory()
 +                .setTotalNumBuckets(numOfBuckets)
 +                .setPartitionResolver(testKeyBasedResolver)
 +                .create())
 +        .create(PartitionedRegionName1);
 +
 +    cache
 +        .createRegionFactory(RegionShortcut.PARTITION)
 +        .setPartitionAttributes(
 +            new PartitionAttributesFactory()
 +                .setTotalNumBuckets(numOfBuckets)
 +                .setPartitionResolver(testKeyBasedResolver)
 +                .setColocatedWith(PartitionedRegionName1)
 +                .create())
 +        .create(PartitionedRegionName2);
 +    
 +    cache
 +    .createRegionFactory(RegionShortcut.PARTITION)
 +    .setPartitionAttributes(
 +        new PartitionAttributesFactory()
 +            .setTotalNumBuckets(numOfBuckets)
 +            .setPartitionResolver(testKeyBasedResolver)
 +            .setColocatedWith(PartitionedRegionName2)
 +            .create())
 +    .create(PartitionedRegionName3);
 +    
 +    cache
 +    .createRegionFactory(RegionShortcut.PARTITION)
 +    .setPartitionAttributes(
 +        new PartitionAttributesFactory()
 +            .setTotalNumBuckets(numOfBuckets)
 +            .setPartitionResolver(testKeyBasedResolver)
 +             .create())
 +    .create(PartitionedRegionName4); // not collocated
 +    
 +    cache
 +    .createRegionFactory(RegionShortcut.PARTITION)
 +    .setPartitionAttributes(
 +        new PartitionAttributesFactory()
 +            .setTotalNumBuckets(numOfBuckets)
 +            .setPartitionResolver(testKeyBasedResolver)
 +            .setColocatedWith(PartitionedRegionName4)
 +             .create())
 +    .create(PartitionedRegionName5); // collocated with 4 
 +  }
 +
 +  public static void createCacheClientWithoutRegion(String host, Integer port1, Integer port2, Integer port3) {
 +    new QueryUsingFunctionContextDUnitTest("temp").createCacheClientWithoutReg(host, port1, port2, port3);
 +  }
 +  private void createCacheClientWithoutReg(String host, Integer port1, Integer port2, Integer port3) {
 +    this.disconnectFromDS();
 +    ClientCache cache = new ClientCacheFactory()
 +      .addPoolServer(host, port1)
 +      .addPoolServer(host, port2)
 +      .addPoolServer(host, port3)
 +      .create();
 +  }
 +
 +  /**
 +   * Run query on server using LocalDataSet.executeQuery() to compare results
 +   * received from client function execution.
 +   */
 +  public static ArrayList runQueryOnServerLocalDataSet(String query, Set filter) {
 +    return new QueryUsingFunctionContextDUnitTest("temp").runQueryOnServerLDS(query, filter);
 +  }
 +
 +  protected ArrayList runQueryOnServerLDS(String queryStr, Set filter) {
 +
 +    Set buckets = getBucketsForFilter(filter);
 +    Region localDataSet = new LocalDataSet((PartitionedRegion)CacheFactory.getAnyInstance().getRegion(PartitionedRegionName1), buckets);
 +
 +    QueryService qservice = CacheFactory.getAnyInstance().getQueryService();
 +
 +    Query query = qservice.newQuery(queryStr);
 +    SelectResults results;
 +    try {
 +      results = (SelectResults) ((LocalDataSet) localDataSet).executeQuery(
 +          (DefaultQuery) query, null, buckets);
 +
 +      return (ArrayList)results.asList();
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * Run query on server to compare the results received from client function execution.
 +   */
 +  public static ArrayList runQueryOnServerRegion(String query) {
 +    return new QueryUsingFunctionContextDUnitTest("temp").runQueryOnServerReg(query);
 +  }
 +  protected ArrayList runQueryOnServerReg(String queryStr) {
 +
 +    QueryService qservice = CacheFactory.getAnyInstance().getQueryService();
 +
 +    Query query = qservice.newQuery(queryStr);
 +    SelectResults results = null;
 +    try {
 +      results = (SelectResults) query.execute();
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +    }
 +    return results!= null ? (ArrayList)results.asList():null;
 +  }
 +
 +  /**
 +   * Run query using a function executed by client on a region on server with filter.
 +   * @param function
 +   * @param regionName
 +   * @param filter
 +   * @return ArrayList of results
 +   */
 +  public static ArrayList runQueryOnClientUsingFunction(Function function, String regionName, Set filter, String query) {
 +    return new QueryUsingFunctionContextDUnitTest("temp").runQueryOnClientUsingFunc(function, regionName, filter, query);
 +  }
 +
 +  private ArrayList runQueryOnClientUsingFunc(Function func, String regionName, Set filter, String query) {
 +    ResultCollector rcollector = null;
 +
 +    // Filter can not be set as null if withFilter() is called.
 +    if (filter != null) {
 +      rcollector = FunctionService.onRegion(CacheFactory.getAnyInstance().getRegion(regionName))
 +        .withArgs(query)
 +        .withFilter(filter)
 +        .execute(func);
 +    } else {
 +      rcollector = FunctionService.onRegion(CacheFactory.getAnyInstance().getRegion(regionName))
 +      .withArgs(query)
 +      .execute(func);
 +    }
 +    Object result = rcollector.getResult();
 +    assertTrue(result instanceof ArrayList);
 +
 +    //Results from multiple nodes.
 +    ArrayList resultList = (ArrayList)result;
 +    resultList.trimToSize();
 +    List queryResults = null;
 +
 +    if (resultList.size()!=0 && resultList.get(0) instanceof ArrayList) {
 +      queryResults = new ArrayList();
 +      for (Object obj: resultList) {
 +        if (obj != null) {
 +          queryResults.addAll((ArrayList)obj);
 +        }
 +      }
 +    }
 +
 +    return (ArrayList) queryResults;
 +  }
 +
 +  /**
 +   * Runs a {@link LocalDataSet} query on a single server. 
 +   * @param func
 +   * @param filter
 +   * @param query
 +   * @return results in a List
 +   */
 +  private ArrayList runLDSQueryOnClientUsingFunc(Function func, Set filter, String query) {
 +    ResultCollector rcollector = null;
 +
 +    // Filter can not be set as null if withFilter() is called.
 +    rcollector = FunctionService
 +        .onServer(ClientCacheFactory.getAnyInstance())
 +        .withArgs(new Object[]{query, filter}).execute(func);
 +    Object result = rcollector.getResult();
 +    assertTrue(result instanceof ArrayList);
 +
 +    //Results from multiple nodes.
 +    ArrayList resultList = (ArrayList)result;
 +    resultList.trimToSize();
 +    List queryResults = new ArrayList();
 +    
 +    if (resultList.size()!=0 && resultList.get(0) instanceof ArrayList) {
 +      for (Object obj: resultList) {
 +        if (obj != null) {
 +          queryResults.addAll((ArrayList)obj);
 +        }
 +      }
 +    }
 +
 +    return (ArrayList) queryResults;
 +  }
 +
 +  private Set getFilter(int start, int end) {
 +    Set filter = new HashSet();
 +    for (int j=start; j<=end; j++) {
 +      filter.add(j);
 +    }
 +    return filter;
 +  }
 +
 +
 +  private Set getBucketsForFilter(Set filter) {
 +    Set bucketids = new HashSet();
 +    for (Object key: filter) {
 +      int intKey = ((Integer)key).intValue();
 +      bucketids.add(intKey%numOfBuckets);
 +    }
 +    return bucketids;
 +  }
 +
 +  /**
 +   * This function puts portfolio objects into the created Region (PR or Local) *
 +   *
 +   * @param regionName
 +   * @param portfolio
 +   * @param to
 +   * @param from
 +   * @return cacheSerializable object
 +   */
 +  public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(
 +      final String regionName, final Object[] portfolio, final int from,
 +      final int to)
 +  {
 +    SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
 +      @Override
 +      public void run2() throws CacheException
 +      {
 +        Cache cache = CacheFactory.getAnyInstance();
 +        Region region = cache.getRegion(regionName);
 +        for (int j = from; j < to; j++)
 +          region.put(new Integer(j), portfolio[j]);
 +        LogWriterUtils.getLogWriter()
 +            .info(
 +                "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region "
 +                    + regionName);
 +      }
 +    };
 +    return (CacheSerializableRunnable)puts;
 +  }
 +  
 + 
 +  public void createIndex() {
 +    QueryService qs = CacheFactory.getAnyInstance().getQueryService();
 +    try {
 +      qs.createIndex("ID1", "ID", "/"+ PartitionedRegionName1);
 +      qs.createIndex("ID2", "ID", "/"+ PartitionedRegionName2);
 +      qs.createIndex("ID3", "ID", "/"+ PartitionedRegionName3);
 +    } catch (Exception e) {
 +        fail("Index creation failed " + e);
 +    }
 +  }
 +
 +}