You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/09 17:32:38 UTC
[nifi] branch master updated: NIFI-6636: Fixed ListGCSBucket file
duplication error
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 21a27c8 NIFI-6636: Fixed ListGCSBucket file duplication error
21a27c8 is described below
commit 21a27c8bb00c3f41836c938c3bb59ff428a27ca4
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Fri Sep 6 13:49:39 2019 +0200
NIFI-6636: Fixed ListGCSBucket file duplication error
ListGCSBucket duplicated files if they arrived not in alphabetical order.
The set storing the name of the latest blob (which was loaded with the highest
timestamp during the previous run of the processor) was cleared too early.
Also changed the state persisting logic: it is now saved only once at the end
of onTrigger() (similar to ListS3). Some inconsistent state (only blob names
without the timestamp) was also saved earlier.
This closes #3702.
Signed-off-by: Mark Payne <ma...@hotmail.com>
---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 41 ++-
.../processors/gcp/storage/ListGCSBucketTest.java | 386 ++++++++++++++++++---
2 files changed, 361 insertions(+), 66 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 01293cf..f4cdb27 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -259,13 +259,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
final Storage storage = getCloudService();
- int listCount = 0;
+
long maxTimestamp = 0L;
+ Set<String> maxKeys = new HashSet<>();
- Page<Blob> blobPages = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
do {
- for (Blob blob : blobPages.getValues()) {
+ int listCount = 0;
+
+ for (Blob blob : blobPage.getValues()) {
long lastModified = blob.getUpdateTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
@@ -381,40 +384,36 @@ public class ListGCSBucket extends AbstractGCSProcessor {
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
- currentKeys.clear();
+ maxKeys.clear();
}
if (lastModified == maxTimestamp) {
- currentKeys.add(blob.getName());
+ maxKeys.add(blob.getName());
}
listCount++;
}
- blobPages = blobPages.getNextPage();
commit(context, session, listCount);
- listCount = 0;
- } while (blobPages != null);
-
- currentTimestamp = maxTimestamp;
- final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
+ blobPage = blobPage.getNextPage();
+ } while (blobPage != null);
- if (!commit(context, session, listCount)) {
- if (currentTimestamp > 0) {
- persistState(context);
- }
+ if (maxTimestamp != 0) {
+ currentTimestamp = maxTimestamp;
+ currentKeys = maxKeys;
+ persistState(context);
+ } else {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
context.yield();
}
+
+ final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
- private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
- boolean willCommit = listCount > 0;
- if (willCommit) {
+ private void commit(final ProcessContext context, final ProcessSession session, int listCount) {
+ if (listCount > 0) {
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
session.commit();
- persistState(context);
}
- return willCommit;
}
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index e17cf4b..1a925d0 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -256,7 +256,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Mock
- Page<Blob> mockBlobPages;
+ Page<Blob> mockBlobPage;
private Blob buildMockBlob(String bucket, String key, long updateTime) {
final Blob blob = mock(Blob.class);
@@ -268,7 +268,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testSuccessfulList() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -279,13 +279,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -343,7 +343,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testOldValues() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -353,13 +353,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.enqueue("test2");
@@ -384,7 +384,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testEmptyList() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -392,13 +392,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -413,8 +413,304 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
+ public void testListWithStateAndFilesComingInAlphabeticalOrder() throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+ final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-2",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-2",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-2"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+ final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndNewFilesComingWithTheSameTimestamp() throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+ buildMockBlob("blob-bucket-3", "blob-key-3", 2L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ flowFile = successes.get(1);
+ assertEquals(
+ "blob-bucket-3",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-3",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1",
+ "blob-key-3"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+ buildMockBlob("blob-bucket-3", "blob-key-3", 1L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "1",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ flowFile = successes.get(1);
+ assertEquals(
+ "blob-bucket-3",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-3",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "1",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 1L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1",
+ "blob-key-3"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
public void testAttributesSet() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -447,13 +743,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -555,7 +851,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerUser() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -568,13 +864,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -598,7 +894,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerGroup() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -611,13 +907,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -642,7 +938,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerDomain() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -655,13 +951,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -686,7 +982,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerProject() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -699,13 +995,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -729,7 +1025,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testYieldOnBadStateRestore() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -737,13 +1033,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
runner.enqueue("test");
@@ -758,7 +1054,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsPrefix() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -772,13 +1068,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -793,7 +1089,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsVersions() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -806,13 +1102,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();