You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/03/07 19:21:45 UTC

[22/51] [abbrv] geode git commit: GEODE-2538: Don't invoke a cache loader when fetching values for a lucene query

GEODE-2538: Don't invoke a cache loader when fetching values for a lucene query

Instead of using getAll, fetch the values of a lucene query using a
function that calls getEntry. We can then avoid invoking the cache
loader.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/712d87f7
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/712d87f7
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/712d87f7

Branch: refs/heads/feature/GEM-1195
Commit: 712d87f791906cbd1c11dd0f4655032dabf57755
Parents: 11521a8
Author: Dan Smith <up...@apache.org>
Authored: Mon Feb 27 14:21:58 2017 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Mon Feb 27 14:26:15 2017 -0800

----------------------------------------------------------------------
 .../cache/lucene/internal/LuceneQueryImpl.java  |  5 +
 .../lucene/internal/LuceneServiceImpl.java      |  2 +
 .../PageableLuceneQueryResultsImpl.java         | 23 ++++-
 .../internal/results/LuceneGetPageFunction.java | 96 ++++++++++++++++++++
 .../internal/results/MapResultCollector.java    | 55 +++++++++++
 .../lucene/LuceneQueriesIntegrationTest.java    | 41 +++++++++
 .../geode/cache/lucene/PaginationDUnitTest.java |  7 +-
 .../internal/LuceneQueryImplJUnitTest.java      | 54 +++++++----
 ...PageableLuceneQueryResultsImplJUnitTest.java | 39 +++++++-
 .../results/LuceneGetPageFunctionJUnitTest.java | 59 ++++++++++++
 10 files changed, 351 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
index de622e0..1ece774 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
@@ -93,6 +93,11 @@ public class LuceneQueryImpl<K, V> implements LuceneQuery<K, V> {
 
   private PageableLuceneQueryResults<K, V> findPages(int pageSize) throws LuceneQueryException {
     TopEntries<K> entries = findTopEntries();
+    return newPageableResults(pageSize, entries);
+  }
+
+  protected PageableLuceneQueryResults<K, V> newPageableResults(final int pageSize,
+      final TopEntries<K> entries) {
     return new PageableLuceneQueryResultsImpl<K, V>(entries.getHits(), region, pageSize);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index a608dd9..dd32000 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
+import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
 import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.analysis.Analyzer;
@@ -89,6 +90,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
     this.cache = gfc;
 
     FunctionService.registerFunction(new LuceneQueryFunction());
+    FunctionService.registerFunction(new LuceneGetPageFunction());
     FunctionService.registerFunction(new WaitUntilFlushedFunction());
     FunctionService.registerFunction(new DumpDirectoryFiles());
     registerDataSerializables();

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImpl.java
index 5c5d825..8db98a5 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImpl.java
@@ -17,14 +17,21 @@ package org.apache.geode.cache.lucene.internal;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.PageableLuceneQueryResults;
 import org.apache.geode.cache.lucene.LuceneResultStruct;
 import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
+import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
+import org.apache.geode.cache.lucene.internal.results.MapResultCollector;
 
 /**
  * Implementation of PageableLuceneQueryResults that fetchs a page at a time from the server, given
@@ -74,15 +81,15 @@ public class PageableLuceneQueryResultsImpl<K, V> implements PageableLuceneQuery
 
   public List<LuceneResultStruct<K, V>> getHitEntries(int fromIndex, int toIndex) {
     List<EntryScore<K>> scores = hits.subList(fromIndex, toIndex);
-    ArrayList<K> keys = new ArrayList<K>(scores.size());
+    Set<K> keys = new HashSet<K>(scores.size());
     for (EntryScore<K> score : scores) {
       keys.add(score.getKey());
     }
 
-    Map<K, V> values = userRegion.getAll(keys);
+    Map<K, V> values = getValues(keys);
 
     ArrayList<LuceneResultStruct<K, V>> results =
-        new ArrayList<LuceneResultStruct<K, V>>(scores.size());
+        new ArrayList<LuceneResultStruct<K, V>>(values.size());
     for (EntryScore<K> score : scores) {
       V value = values.get(score.getKey());
       if (value != null)
@@ -91,6 +98,16 @@ public class PageableLuceneQueryResultsImpl<K, V> implements PageableLuceneQuery
     return results;
   }
 
+  protected Map<K, V> getValues(final Set<K> keys) {
+    ResultCollector resultCollector = onRegion().withFilter(keys)
+        .withCollector(new MapResultCollector()).execute(LuceneGetPageFunction.ID);
+    return (Map<K, V>) resultCollector.getResult();
+  }
+
+  protected Execution onRegion() {
+    return FunctionService.onRegion(userRegion);
+  }
+
   @Override
   public List<LuceneResultStruct<K, V>> next() {
     if (!hasNext()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction.java
new file mode 100644
index 0000000..f5c2b99
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.lucene.internal.results;
+
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Region.Entry;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.lucene.LuceneQueryException;
+import org.apache.geode.cache.lucene.LuceneQueryProvider;
+import org.apache.geode.cache.lucene.LuceneService;
+import org.apache.geode.cache.lucene.LuceneServiceProvider;
+import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
+import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
+import org.apache.geode.cache.lucene.internal.distributed.CollectorManager;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneFunctionContext;
+import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
+import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollectorManager;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
+import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
+import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.InternalEntity;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.search.Query;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * {@link LuceneGetPageFunction} Returns the values of entries back to the user This behaves
+ * basically like a getAll, but it does not invoke a cache loader
+ */
+public class LuceneGetPageFunction implements Function, InternalEntity {
+  private static final long serialVersionUID = 1L;
+  public static final String ID = LuceneGetPageFunction.class.getName();
+
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext ctx = (RegionFunctionContext) context;
+    Region region = PartitionRegionHelper.getLocalDataForContext(ctx);
+    Set<?> keys = ctx.getFilter();
+
+    Map<Object, Object> results = new HashMap<>(keys.size());
+
+    for (Object key : keys) {
+      final Entry entry = region.getEntry(key);
+      try {
+        Object value = entry == null ? null : entry.getValue();
+        if (value != null) {
+          results.put(key, value);
+        }
+      } catch (EntryDestroyedException e) {
+        // skip
+      }
+    }
+    ctx.getResultSender().lastResult(results);
+  }
+
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/MapResultCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/MapResultCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/MapResultCollector.java
new file mode 100644
index 0000000..9264126
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/results/MapResultCollector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.results;
+
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MapResultCollector implements ResultCollector<Map<?, ?>, Map<Object, Object>> {
+  private final Map<Object, Object> results = new HashMap<>();
+
+  @Override
+  public Map<Object, Object> getResult() throws FunctionException {
+    return results;
+  }
+
+  @Override
+  public Map<Object, Object> getResult(final long timeout, final TimeUnit unit)
+      throws FunctionException, InterruptedException {
+    return results;
+  }
+
+  @Override
+  public void addResult(final DistributedMember memberID, final Map<?, ?> resultOfSingleExecution) {
+    this.results.putAll(resultOfSingleExecution);
+
+  }
+
+  @Override
+  public void endResults() {
+
+  }
+
+  @Override
+  public void clearResults() {
+    results.clear();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
index 01bdca4..6420307 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
@@ -29,6 +29,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.LoaderHelper;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.Tokenizer;
@@ -363,6 +366,44 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest {
   }
 
   @Test
+  public void shouldReturnCorrectResultsOnDeletionAfterQueryExecutionWithLoader() throws Exception {
+    final int pageSize = 2;
+    final LuceneQuery<Object, Object> query = addValuesAndCreateQuery(pageSize);
+    region.getAttributesMutator().setCacheLoader(new CacheLoader() {
+      @Override
+      public Object load(final LoaderHelper helper) throws CacheLoaderException {
+        return new TestObject("should not", "load this");
+      }
+
+      @Override
+      public void close() {
+
+      }
+    });
+    final PageableLuceneQueryResults<Object, Object> pages = query.findPages();
+    List<LuceneResultStruct<Object, Object>> allEntries = new ArrayList<>();
+    assertTrue(pages.hasNext());
+    assertEquals(7, pages.size());
+    // Destroying an entry from the region after the query is executed.
+    region.destroy("C");
+    final List<LuceneResultStruct<Object, Object>> page1 = pages.next();
+    assertEquals(pageSize, page1.size());
+    final List<LuceneResultStruct<Object, Object>> page2 = pages.next();
+    assertEquals(pageSize, page2.size());
+    final List<LuceneResultStruct<Object, Object>> page3 = pages.next();
+    assertEquals(pageSize, page3.size());
+    assertFalse(pages.hasNext());
+
+    allEntries.addAll(page1);
+    allEntries.addAll(page2);
+    allEntries.addAll(page3);
+    assertEquals(region.keySet(),
+        allEntries.stream().map(entry -> entry.getKey()).collect(Collectors.toSet()));
+    assertEquals(region.values(),
+        allEntries.stream().map(entry -> entry.getValue()).collect(Collectors.toSet()));
+  }
+
+  @Test
   public void shouldReturnCorrectResultsOnMultipleDeletionsAfterQueryExecution() throws Exception {
     final LuceneQuery<Object, Object> query = addValuesAndCreateQuery(2);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/PaginationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/PaginationDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/PaginationDUnitTest.java
index de71c08..3ff86f7 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/PaginationDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/PaginationDUnitTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
@@ -59,7 +60,7 @@ public class PaginationDUnitTest extends LuceneQueriesAccessorBase {
 
   @Test
   @Parameters(method = "getListOfRegionTestTypes")
-  public void noSuchElementExceptionWhenAllDataStoreAreClosedWhilePagination(
+  public void partitionedRegionStorageExceptionWhenAllDataStoreAreClosedWhilePagination(
       RegionTestableType regionTestType) {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
@@ -90,8 +91,8 @@ public class PaginationDUnitTest extends LuceneQueriesAccessorBase {
         fail();
       } catch (Exception e) {
         Assert.assertEquals(
-            "Expected Exception = java.util.NoSuchElementException but hit " + e.toString(), true,
-            e instanceof java.util.NoSuchElementException);
+            "Expected Exception = PartitionedRegionStorageException but hit " + e.toString(), true,
+            e instanceof PartitionedRegionStorageException);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
index ca439fe..20f5d1d 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
@@ -16,35 +16,38 @@
 package org.apache.geode.cache.lucene.internal;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.ArgumentCaptor;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.LuceneQueryException;
 import org.apache.geode.cache.lucene.LuceneQueryProvider;
-import org.apache.geode.cache.lucene.PageableLuceneQueryResults;
 import org.apache.geode.cache.lucene.LuceneResultStruct;
+import org.apache.geode.cache.lucene.PageableLuceneQueryResults;
 import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneFunctionContext;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntries;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @Category(UnitTest.class)
 public class LuceneQueryImplJUnitTest {
@@ -54,6 +57,7 @@ public class LuceneQueryImplJUnitTest {
   private LuceneQueryProvider provider;
   private ResultCollector<TopEntriesCollector, TopEntries> collector;
   private Region region;
+  private PageableLuceneQueryResults<Object, Object> results;
 
   @Before
   public void createMocks() {
@@ -65,12 +69,20 @@ public class LuceneQueryImplJUnitTest {
     when(execution.withArgs(any())).thenReturn(execution);
     when(execution.withCollector(any())).thenReturn(execution);
     when(execution.execute(anyString())).thenReturn((ResultCollector) collector);
+    results = mock(PageableLuceneQueryResults.class);
 
     query = new LuceneQueryImpl<Object, Object>("index", region, provider, LIMIT, 20) {
+
       @Override
       protected Execution onRegion() {
         return execution;
       }
+
+      @Override
+      protected PageableLuceneQueryResults<Object, Object> newPageableResults(final int pageSize,
+          final TopEntries<Object> entries) {
+        return results;
+      }
     };
   }
 
@@ -79,9 +91,12 @@ public class LuceneQueryImplJUnitTest {
     entries.addHit(new EntryScore("hi", 5));
     when(collector.getResult()).thenReturn(entries);
 
-    Map<String, String> getAllResult = new HashMap<String, String>();
-    getAllResult.put("hi", "value");
-    when(region.getAll(eq(Collections.singletonList("hi")))).thenReturn(getAllResult);
+    when(results.getMaxScore()).thenReturn(5f);
+    when(results.size()).thenReturn(1);
+    List<LuceneResultStruct<Object, Object>> page =
+        Collections.singletonList(new LuceneResultStructImpl<>("hi", "value", 5f));
+    when(results.next()).thenReturn(page);
+    when(results.hasNext()).thenReturn(true);
   }
 
   @Test
@@ -137,6 +152,7 @@ public class LuceneQueryImplJUnitTest {
     assertEquals("index", context.getIndexName());
 
     assertEquals(5, results.getMaxScore(), 0.01);
+    assertEquals(1, results.size());
     final List<LuceneResultStruct<Object, Object>> page = results.next();
     assertEquals(1, page.size());
     LuceneResultStruct element = page.iterator().next();

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImplJUnitTest.java
index 35a4c91..bc38112 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PageableLuceneQueryResultsImplJUnitTest.java
@@ -15,6 +15,12 @@
 package org.apache.geode.cache.lucene.internal;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -23,12 +29,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.ResultCollector;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -48,6 +58,7 @@ public class PageableLuceneQueryResultsImplJUnitTest {
   private List<EntryScore<String>> hits;
   private List<LuceneResultStruct> expected = new ArrayList<LuceneResultStruct>();
   private Region<String, String> userRegion;
+  private Execution execution;
 
   @Before
   public void setUp() {
@@ -58,13 +69,21 @@ public class PageableLuceneQueryResultsImplJUnitTest {
       expected.add(new LuceneResultStructImpl<String, String>("key_" + i, "value_" + i, i));
     }
 
-    userRegion = Mockito.mock(Region.class);
+    userRegion = mock(Region.class);
 
-    Mockito.when(userRegion.getAll(Mockito.anyCollection())).thenAnswer(new Answer() {
+    final ResultCollector collector = mock(ResultCollector.class);
+    execution = mock(Execution.class);
+    when(execution.withFilter(any())).thenReturn(execution);
+    when(execution.withCollector(any())).thenReturn(execution);
+    when(execution.execute(anyString())).thenReturn(collector);
+
+    when(collector.getResult()).then(new Answer() {
 
       @Override
       public Map answer(InvocationOnMock invocation) throws Throwable {
-        Collection<String> keys = invocation.getArgumentAt(0, Collection.class);
+        ArgumentCaptor<Set> captor = ArgumentCaptor.forClass(Set.class);
+        verify(execution, atLeast(1)).withFilter(captor.capture());
+        Collection<String> keys = captor.getValue();
         Map<String, String> results = new HashMap<String, String>();
         for (String key : keys) {
           results.put(key, key.replace("key_", "value_"));
@@ -88,7 +107,12 @@ public class PageableLuceneQueryResultsImplJUnitTest {
   @Test
   public void testPagination() {
     PageableLuceneQueryResultsImpl<String, String> results =
-        new PageableLuceneQueryResultsImpl<String, String>(hits, userRegion, 10);
+        new PageableLuceneQueryResultsImpl<String, String>(hits, userRegion, 10) {
+          @Override
+          protected Execution onRegion() {
+            return execution;
+          }
+        };
 
     assertEquals(23, results.size());
 
@@ -114,7 +138,12 @@ public class PageableLuceneQueryResultsImplJUnitTest {
   @Test
   public void testNoPagination() {
     PageableLuceneQueryResultsImpl<String, String> results =
-        new PageableLuceneQueryResultsImpl<String, String>(hits, userRegion, 0);
+        new PageableLuceneQueryResultsImpl<String, String>(hits, userRegion, 0) {
+          @Override
+          protected Execution onRegion() {
+            return execution;
+          }
+        };
 
     assertEquals(23, results.size());
 

http://git-wip-us.apache.org/repos/asf/geode/blob/712d87f7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunctionJUnitTest.java
new file mode 100644
index 0000000..c62082c
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunctionJUnitTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.results;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Region.Entry;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.Set;
+
+@Category(UnitTest.class)
+public class LuceneGetPageFunctionJUnitTest {
+
+  @Test
+  public void shouldReturnMapWithKeyAndValue() {
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    InternalRegionFunctionContext context = mock(InternalRegionFunctionContext.class);
+    when(context.getDataSet()).thenReturn(region);
+    ResultSender resultSender = mock(ResultSender.class);
+    when(context.getResultSender()).thenReturn(resultSender);
+    LuceneGetPageFunction function = new LuceneGetPageFunction();
+    when(context.getLocalDataSet(any())).thenReturn(region);
+    final Entry entry = mock(Entry.class);
+    when(region.getEntry(any())).thenReturn(entry);
+    when(entry.getValue()).thenReturn("value");
+    when(context.getFilter()).thenReturn((Set) Collections.singleton("key"));
+    function.execute(context);
+
+    verify(resultSender).lastResult(eq(Collections.singletonMap("key", "value")));
+  }
+
+}