You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/09 17:32:38 UTC

[nifi] branch master updated: NIFI-6636: Fixed ListGCSBucket file duplication error

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a27c8  NIFI-6636: Fixed ListGCSBucket file duplication error
21a27c8 is described below

commit 21a27c8bb00c3f41836c938c3bb59ff428a27ca4
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Fri Sep 6 13:49:39 2019 +0200

    NIFI-6636: Fixed ListGCSBucket file duplication error
    
    ListGCSBucket duplicated files if they arrived not in alphabetical order.
    The set storing the name of the latest blob (which was loaded with the highest
    timestamp during the previous run of the processor) was cleared too early.
    
    Also changed the state persisting logic: it is now saved only once at the end
    of onTrigger() (similar to ListS3). Some inconsistent state (only blob names
    without the timestamp) was also saved earlier.
    
    This closes #3702.
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java |  41 ++-
 .../processors/gcp/storage/ListGCSBucketTest.java  | 386 ++++++++++++++++++---
 2 files changed, 361 insertions(+), 66 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 01293cf..f4cdb27 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -259,13 +259,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         }
 
         final Storage storage = getCloudService();
-        int listCount = 0;
+
         long maxTimestamp = 0L;
+        Set<String> maxKeys = new HashSet<>();
 
 
-        Page<Blob> blobPages = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
+        Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
         do {
-            for (Blob blob : blobPages.getValues()) {
+            int listCount = 0;
+
+            for (Blob blob : blobPage.getValues()) {
                 long lastModified = blob.getUpdateTime();
                 if (lastModified < currentTimestamp
                         || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
@@ -381,40 +384,36 @@ public class ListGCSBucket extends AbstractGCSProcessor {
                 // Update state
                 if (lastModified > maxTimestamp) {
                     maxTimestamp = lastModified;
-                    currentKeys.clear();
+                    maxKeys.clear();
                 }
                 if (lastModified == maxTimestamp) {
-                    currentKeys.add(blob.getName());
+                    maxKeys.add(blob.getName());
                 }
                 listCount++;
             }
 
-            blobPages = blobPages.getNextPage();
             commit(context, session, listCount);
-            listCount = 0;
-        } while (blobPages != null);
-
-        currentTimestamp = maxTimestamp;
 
-        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
+            blobPage = blobPage.getNextPage();
+        } while (blobPage != null);
 
-        if (!commit(context, session, listCount)) {
-            if (currentTimestamp > 0) {
-                persistState(context);
-            }
+        if (maxTimestamp != 0) {
+            currentTimestamp = maxTimestamp;
+            currentKeys = maxKeys;
+            persistState(context);
+        } else {
             getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
             context.yield();
         }
+
+        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
-    private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
-        boolean willCommit = listCount > 0;
-        if (willCommit) {
+    private void commit(final ProcessContext context, final ProcessSession session, int listCount) {
+        if (listCount > 0) {
             getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
             session.commit();
-            persistState(context);
         }
-        return willCommit;
     }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index e17cf4b..1a925d0 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -256,7 +256,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Mock
-    Page<Blob> mockBlobPages;
+    Page<Blob> mockBlobPage;
 
     private Blob buildMockBlob(String bucket, String key, long updateTime) {
         final Blob blob = mock(Blob.class);
@@ -268,7 +268,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testSuccessfulList() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -279,13 +279,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
         );
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -343,7 +343,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testOldValues() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -353,13 +353,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
         );
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.enqueue("test2");
@@ -384,7 +384,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testEmptyList() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -392,13 +392,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -413,8 +413,304 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
+    public void testListWithStateAndFilesComingInAlphabeticalOrder() throws Exception {
+        reset(storage, mockBlobPage);
+        final ListGCSBucket processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        final Map<String, String> state = ImmutableMap.of(
+                ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+                ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"
+        );
+
+        runner.getStateManager().setState(state, Scope.CLUSTER);
+
+        final Iterable<Blob> mockList = ImmutableList.of(
+                buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+                buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
+        );
+
+        when(mockBlobPage.getValues())
+                .thenReturn(mockList);
+
+        when(mockBlobPage.getNextPage()).thenReturn(null);
+
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+                .thenReturn(mockBlobPage);
+
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+        MockFlowFile flowFile = successes.get(0);
+        assertEquals(
+                "blob-bucket-2",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-2",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "2",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        assertEquals(
+                2L,
+                processor.currentTimestamp
+        );
+
+        assertEquals(
+                ImmutableSet.of(
+                        "blob-key-2"
+                ),
+                processor.currentKeys
+        );
+    }
+
+    @Test
+    public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws Exception {
+        reset(storage, mockBlobPage);
+        final ListGCSBucket processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        final Map<String, String> state = ImmutableMap.of(
+                ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+                ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+        );
+
+        runner.getStateManager().setState(state, Scope.CLUSTER);
+
+        final Iterable<Blob> mockList = ImmutableList.of(
+                buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+                buildMockBlob("blob-bucket-2", "blob-key-2", 1L)
+        );
+
+        when(mockBlobPage.getValues())
+                .thenReturn(mockList);
+
+        when(mockBlobPage.getNextPage()).thenReturn(null);
+
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+                .thenReturn(mockBlobPage);
+
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+        MockFlowFile flowFile = successes.get(0);
+        assertEquals(
+                "blob-bucket-1",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-1",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "2",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        assertEquals(
+                2L,
+                processor.currentTimestamp
+        );
+
+        assertEquals(
+                ImmutableSet.of(
+                        "blob-key-1"
+                ),
+                processor.currentKeys
+        );
+    }
+
+    @Test
+    public void testListWithStateAndNewFilesComingWithTheSameTimestamp() throws Exception {
+        reset(storage, mockBlobPage);
+        final ListGCSBucket processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        final Map<String, String> state = ImmutableMap.of(
+                ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+                ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+        );
+
+        runner.getStateManager().setState(state, Scope.CLUSTER);
+
+        final Iterable<Blob> mockList = ImmutableList.of(
+                buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+                buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+                buildMockBlob("blob-bucket-3", "blob-key-3", 2L)
+        );
+
+        when(mockBlobPage.getValues())
+                .thenReturn(mockList);
+
+        when(mockBlobPage.getNextPage()).thenReturn(null);
+
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+                .thenReturn(mockBlobPage);
+
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+        MockFlowFile flowFile = successes.get(0);
+        assertEquals(
+                "blob-bucket-1",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-1",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "2",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        flowFile = successes.get(1);
+        assertEquals(
+                "blob-bucket-3",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-3",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "2",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        assertEquals(
+                2L,
+                processor.currentTimestamp
+        );
+
+        assertEquals(
+                ImmutableSet.of(
+                        "blob-key-1",
+                        "blob-key-3"
+                ),
+                processor.currentKeys
+        );
+    }
+
+    @Test
+    public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() throws Exception {
+        reset(storage, mockBlobPage);
+        final ListGCSBucket processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        final Map<String, String> state = ImmutableMap.of(
+                ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+                ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+        );
+
+        runner.getStateManager().setState(state, Scope.CLUSTER);
+
+        final Iterable<Blob> mockList = ImmutableList.of(
+                buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+                buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+                buildMockBlob("blob-bucket-3", "blob-key-3", 1L)
+        );
+
+        when(mockBlobPage.getValues())
+                .thenReturn(mockList);
+
+        when(mockBlobPage.getNextPage()).thenReturn(null);
+
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+                .thenReturn(mockBlobPage);
+
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+        runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+        MockFlowFile flowFile = successes.get(0);
+        assertEquals(
+                "blob-bucket-1",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-1",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "1",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        flowFile = successes.get(1);
+        assertEquals(
+                "blob-bucket-3",
+                flowFile.getAttribute(BUCKET_ATTR)
+        );
+
+        assertEquals(
+                "blob-key-3",
+                flowFile.getAttribute(KEY_ATTR)
+        );
+
+        assertEquals(
+                "1",
+                flowFile.getAttribute(UPDATE_TIME_ATTR)
+        );
+
+        assertEquals(
+                1L,
+                processor.currentTimestamp
+        );
+
+        assertEquals(
+                ImmutableSet.of(
+                        "blob-key-1",
+                        "blob-key-3"
+                ),
+                processor.currentKeys
+        );
+    }
+
+    @Test
     public void testAttributesSet() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -447,13 +743,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -555,7 +851,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerUser() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -568,13 +864,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -598,7 +894,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerGroup() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -611,13 +907,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -642,7 +938,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerDomain() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -655,13 +951,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -686,7 +982,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testAclOwnerProject() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -699,13 +995,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -729,7 +1025,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testYieldOnBadStateRestore() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -737,13 +1033,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
         runner.enqueue("test");
@@ -758,7 +1054,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListOptionsPrefix() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -772,13 +1068,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), argumentCaptor.capture()))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -793,7 +1089,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListOptionsVersions() throws Exception {
-        reset(storage, mockBlobPages);
+        reset(storage, mockBlobPage);
         final ListGCSBucket processor = getProcessor();
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
@@ -806,13 +1102,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPages.getValues())
+        when(mockBlobPage.getValues())
                 .thenReturn(mockList);
 
-        when(mockBlobPages.getNextPage()).thenReturn(null);
+        when(mockBlobPage.getNextPage()).thenReturn(null);
 
         when(storage.list(anyString(), argumentCaptor.capture()))
-                .thenReturn(mockBlobPages);
+                .thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();