You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/19 23:56:12 UTC

incubator-gobblin git commit: [GOBBLIN-580] Fix bugs in Google Search Console connector

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 9acb2b257 -> 27655c41a


[GOBBLIN-580] Fix bugs in Google Search Console connector

Closes #2444 from enjoyear/gsc3


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/27655c41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/27655c41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/27655c41

Branch: refs/heads/master
Commit: 27655c41a68adc88ceb34c3b0a142c6e6e731750
Parents: 9acb2b2
Author: Chen Guo <al...@gmail.com>
Authored: Wed Sep 19 16:56:04 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Sep 19 16:56:04 2018 -0700

----------------------------------------------------------------------
 .../google/webmaster/GoogleWebMasterSource.java |  17 +-
 .../GoogleWebmasterDataFetcherImpl.java         | 163 ++++++++++---------
 .../GoogleWebmasterExtractorIterator.java       |   2 +-
 .../GoogleWebmasterDataFetcherImplTest.java     | 102 ++++++++++--
 4 files changed, 181 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27655c41/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebMasterSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebMasterSource.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebMasterSource.java
index ce43c54..ca74b35 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebMasterSource.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebMasterSource.java
@@ -166,10 +166,15 @@ abstract class GoogleWebMasterSource extends QueryBasedSource<String, String[]>
    */
   public static final String KEY_PAGES_TUNING_MAX_RETRIES = PAGES_TUNING + "max_retries";
   /**
-   * Optional. Default to 2 minutes.
-   * Set the time out in minutes while getting all pages.
+   * Optional. Default to 30 seconds.
+   * Set the cooldown time in seconds while getting the page count.
    */
-  public static final String KEY_PAGES_TUNING_TIME_OUT = PAGES_TUNING + "time_out";
+  public static final String KEY_PAGES_COUNT_TUNING_COOLDOWN_TIME = PAGES_TUNING + "size.cooldown";
+  /**
+   * Optional. Default to 5 seconds.
+   * Set the cooldown time in seconds while getting all pages.
+   */
+  public static final String KEY_PAGES_GET_TUNING_COOLDOWN_TIME = PAGES_TUNING + "get.cooldown";
   // =============================================
   // =========   GET PAGES TUNING END ============
   // =============================================
@@ -179,8 +184,7 @@ abstract class GoogleWebMasterSource extends QueryBasedSource<String, String[]>
   public static final String DEFAULT_SOURCE_PROPERTY_COLUMN_NAME = "Source";
 
   @Override
-  public Extractor<String, String[]> getExtractor(WorkUnitState state)
-      throws IOException {
+  public Extractor<String, String[]> getExtractor(WorkUnitState state) throws IOException {
     List<GoogleWebmasterFilter.Dimension> requestedDimensions = getRequestedDimensions(state);
     List<GoogleWebmasterDataFetcher.Metric> requestedMetrics = getRequestedMetrics(state);
 
@@ -207,8 +211,7 @@ abstract class GoogleWebMasterSource extends QueryBasedSource<String, String[]>
 
   abstract GoogleWebmasterExtractor createExtractor(WorkUnitState state, Map<String, Integer> columnPositionMap,
       List<GoogleWebmasterFilter.Dimension> requestedDimensions,
-      List<GoogleWebmasterDataFetcher.Metric> requestedMetrics, JsonArray schemaJson)
-      throws IOException;
+      List<GoogleWebmasterDataFetcher.Metric> requestedMetrics, JsonArray schemaJson) throws IOException;
 
   private void validateFilters(String filters) {
     String countryPrefix = "COUNTRY.";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27655c41/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImpl.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImpl.java
index d5a00c3..7d01a1d 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImpl.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImpl.java
@@ -17,62 +17,56 @@
 
 package org.apache.gobblin.ingestion.google.webmaster;
 
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.repackaged.com.google.common.base.Preconditions;
+import com.google.api.services.webmasters.model.ApiDimensionFilter;
+import com.google.api.services.webmasters.model.SearchAnalyticsQueryResponse;
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.repackaged.com.google.common.base.Preconditions;
-import com.google.api.services.webmasters.model.ApiDimensionFilter;
-import com.google.api.services.webmasters.model.SearchAnalyticsQueryResponse;
-import com.google.common.base.Optional;
-
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.limiter.RateBasedLimiter;
 
-import static org.apache.gobblin.ingestion.google.webmaster.GoogleWebmasterFilter.Dimension;
-import static org.apache.gobblin.ingestion.google.webmaster.GoogleWebmasterFilter.FilterOperator;
-import static org.apache.gobblin.ingestion.google.webmaster.GoogleWebmasterFilter.countryFilterToString;
+import static org.apache.gobblin.ingestion.google.webmaster.GoogleWebmasterFilter.*;
 
 
 @Slf4j
 public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
   private final double API_REQUESTS_PER_SECOND;
   private final RateBasedLimiter LIMITER;
-  private final int GET_PAGE_SIZE_TIME_OUT;
+  private final int PAGES_COUNT_COOLDOWN_TIME; //In seconds
+  private final int PAGES_GET_COOLDOWN_TIME; //In seconds
   private final int GET_PAGES_RETRIES;
 
   private final String _siteProperty;
   private final GoogleWebmasterClient _client;
   private final List<ProducerJob> _jobs;
 
-  GoogleWebmasterDataFetcherImpl(String siteProperty, GoogleWebmasterClient client, State wuState)
-      throws IOException {
+  GoogleWebmasterDataFetcherImpl(String siteProperty, GoogleWebmasterClient client, State wuState) throws IOException {
     _siteProperty = siteProperty;
     Preconditions.checkArgument(_siteProperty.endsWith("/"), "The site property must end in \"/\"");
     _client = client;
     _jobs = getHotStartJobs(wuState);
     API_REQUESTS_PER_SECOND = wuState.getPropAsDouble(GoogleWebMasterSource.KEY_PAGES_TUNING_REQUESTS_PER_SECOND, 4.5);
-    GET_PAGE_SIZE_TIME_OUT = wuState.getPropAsInt(GoogleWebMasterSource.KEY_PAGES_TUNING_TIME_OUT, 2);
+    PAGES_COUNT_COOLDOWN_TIME = wuState.getPropAsInt(GoogleWebMasterSource.KEY_PAGES_COUNT_TUNING_COOLDOWN_TIME, 30);
+    PAGES_GET_COOLDOWN_TIME = wuState.getPropAsInt(GoogleWebMasterSource.KEY_PAGES_GET_TUNING_COOLDOWN_TIME, 5);
     LIMITER = new RateBasedLimiter(API_REQUESTS_PER_SECOND, TimeUnit.SECONDS);
     GET_PAGES_RETRIES = wuState.getPropAsInt(GoogleWebMasterSource.KEY_PAGES_TUNING_MAX_RETRIES, 120);
   }
@@ -115,9 +109,8 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
     Collection<String> allPages = getPages(startDate, endDate, requestedDimensions, countryFilter, jobs,
         Math.min(rowLimit, GoogleWebmasterClient.API_ROW_LIMIT));
     int actualSize = allPages.size();
-    log.info(String
-        .format("A total of %d pages fetched for property %s at country-%s from %s to %s", actualSize, _siteProperty,
-            country, startDate, endDate));
+    log.info(String.format("A total of %d pages fetched for property %s at country-%s from %s to %s", actualSize,
+        _siteProperty, country, startDate, endDate));
 
     if (expectedSize != -1 && actualSize != expectedSize) {
       log.warn(String.format("Expected page size is %d, but only able to get %d", expectedSize, actualSize));
@@ -133,9 +126,8 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
   /**
    * @return the size of all pages data set
    */
-  private int getPagesSize(final String startDate, final String endDate, final String country,
-      final List<Dimension> requestedDimensions, final List<ApiDimensionFilter> apiDimensionFilters)
-      throws IOException {
+  int getPagesSize(final String startDate, final String endDate, final String country,
+      final List<Dimension> requestedDimensions, final List<ApiDimensionFilter> apiDimensionFilters) {
     final ExecutorService es = Executors.newCachedThreadPool(
         ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of(this.getClass().getSimpleName())));
 
@@ -143,74 +135,86 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
     long groupSize = Math.max(1, Math.round(API_REQUESTS_PER_SECOND));
     List<Future<Integer>> results = new ArrayList<>((int) groupSize);
 
+    int max = -1;
     while (true) {
       for (int i = 0; i < groupSize; ++i) {
         final int start = startRow;
         startRow += GoogleWebmasterClient.API_ROW_LIMIT;
 
-        Future<Integer> submit = es.submit(new Callable<Integer>() {
-          @Override
-          public Integer call() {
-            log.info(String.format("Getting page size from %s...", start));
-            String interruptedMsg = String
-                .format("Interrupted while trying to get the size of all pages for %s. Current start row is %d.",
-                    country, start);
-            while (true) {
-              try {
-                LIMITER.acquirePermits(1);
-              } catch (InterruptedException e) {
-                log.error("RateBasedLimiter: " + interruptedMsg, e);
-                return -1;
-              }
-
-              if (Thread.interrupted()) {
-                log.error(interruptedMsg);
-                return -1;
-              }
+        Future<Integer> submit = es.submit(() -> {
+          log.info(String.format("Getting page size from %s...", start));
+          String interruptedMsg =
+              String.format("Interrupted while trying to get the size of all pages for %s. Current start row is %d.",
+                  country, start);
+          int r = 0;
+          while (r <= GET_PAGES_RETRIES) {
+            ++r;
+            try {
+              LIMITER.acquirePermits(1);
+            } catch (InterruptedException e) {
+              log.error("RateBasedLimiter: " + interruptedMsg, e);
+              return -1;
+            }
 
-              try {
-                List<String> pages = _client
-                    .getPages(_siteProperty, startDate, endDate, country, GoogleWebmasterClient.API_ROW_LIMIT,
-                        requestedDimensions, apiDimensionFilters, start);
-                if (pages.size() < GoogleWebmasterClient.API_ROW_LIMIT) {
-                  return pages.size() + start;  //Figured out the size
-                } else {
-                  return -1;
-                }
-              } catch (IOException e) {
-                log.info(String.format("Getting page size from %s failed. Retrying...", start));
+            try {
+              List<String> pages =
+                  _client.getPages(_siteProperty, startDate, endDate, country, GoogleWebmasterClient.API_ROW_LIMIT,
+                      requestedDimensions, apiDimensionFilters, start);
+              if (pages.size() == 0) {
+                return 0;
               }
+              int totalPages = pages.size() + start;
+              log.info(String.format("At least %s pages exist. Continuing...", totalPages));
+              return totalPages;
+            } catch (IOException e) {
+              log.info(String.format("Getting page size from %s failed due to %s. Retrying...", start, e.getMessage()));
+              coolDown(r, PAGES_COUNT_COOLDOWN_TIME);
             }
           }
+          throw new RuntimeException(String.format(
+              "Getting all pages reaches the maximum number of retires %d. Date range: %s ~ %s. Country: %s.",
+              GET_PAGES_RETRIES, startDate, endDate, country));
         });
         results.add(submit);
       }
-      //Check the results group in order. The first non-negative count indicates the size of total pages.
+
+      List<Integer> pagesCount = new ArrayList<>();
       for (Future<Integer> result : results) {
         try {
-          Integer integer = result.get(GET_PAGE_SIZE_TIME_OUT, TimeUnit.MINUTES);
-          if (integer >= 0) {
-            es.shutdownNow();
-            return integer;
-          }
+          pagesCount.add(result.get());
         } catch (InterruptedException | ExecutionException e) {
           throw new RuntimeException(e);
-        } catch (TimeoutException e) {
-          throw new RuntimeException(String
-              .format("Exceeding the timeout of %d minutes while getting the total size of all pages.",
-                  GET_PAGE_SIZE_TIME_OUT), e);
         }
       }
+
+      if (pagesCount.stream().allMatch(x -> x == 0)) {
+        return max;
+      }
+      max = Math.max(max, Collections.max(pagesCount));
+      if (max % GoogleWebmasterClient.API_ROW_LIMIT != 0) {
+        return max;
+      }
+
       results.clear();
     }
   }
 
+  private void coolDown(int r, int secondsInterval) {
+    int milliSeconds = secondsInterval + (r / 5) * secondsInterval;
+    milliSeconds *= 1000;
+    log.info(String.format("Sleeping for %s seconds", milliSeconds / 1000));
+    try {
+      Thread.sleep(milliSeconds);
+    } catch (InterruptedException e1) {
+      throw new RuntimeException(e1);
+    }
+  }
+
   /**
    * Get all pages in an async mode.
    */
   private Collection<String> getPages(String startDate, String endDate, List<Dimension> dimensions,
-      ApiDimensionFilter countryFilter, Queue<Pair<String, FilterOperator>> toProcess, int rowLimit)
-      throws IOException {
+      ApiDimensionFilter countryFilter, Queue<Pair<String, FilterOperator>> toProcess, int rowLimit) {
     String country = GoogleWebmasterFilter.countryFilterToString(countryFilter);
 
     ConcurrentLinkedDeque<String> allPages = new ConcurrentLinkedDeque<>();
@@ -231,9 +235,8 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
         boolean terminated = es.awaitTermination(5, TimeUnit.MINUTES);
         if (!terminated) {
           es.shutdownNow();
-          log.warn(String
-              .format("Timed out while getting all pages for country-%s at round %d. Next round now has size %d.",
-                  country, r, nextRound.size()));
+          log.warn("Timed out while getting all pages for country-{} at round {}. Next round now has size {}.", country,
+              r, nextRound.size());
         }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
@@ -243,10 +246,11 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
         break;
       }
       toProcess = nextRound;
+      coolDown(r, PAGES_GET_COOLDOWN_TIME);
     }
-    if (r == GET_PAGES_RETRIES) {
-      throw new RuntimeException(String
-          .format("Getting all pages reaches the maximum number of retires %d. Date range: %s ~ %s. Country: %s.",
+    if (r == GET_PAGES_RETRIES + 1) {
+      throw new RuntimeException(
+          String.format("Getting all pages reaches the maximum number of retires %d. Date range: %s ~ %s. Country: %s.",
               GET_PAGES_RETRIES, startDate, endDate, country));
     }
     return allPages;
@@ -276,8 +280,8 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
         List<String> pages;
         try {
           pages = _client.getPages(_siteProperty, startDate, endDate, countryString, rowLimit, dimensions, filters, 0);
-          log.debug(String
-              .format("%d pages fetched for %s market-%s from %s to %s.", pages.size(), jobString, countryString,
+          log.debug(
+              String.format("%d pages fetched for %s market-%s from %s to %s.", pages.size(), jobString, countryString,
                   startDate, endDate));
         } catch (IOException e) {
           log.debug(String.format("%s failed due to %s. Retrying...", jobString, e.getMessage()));
@@ -350,8 +354,8 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
   public List<String[]> performSearchAnalyticsQuery(String startDate, String endDate, int rowLimit,
       List<Dimension> requestedDimensions, List<Metric> requestedMetrics, Collection<ApiDimensionFilter> filters)
       throws IOException {
-    SearchAnalyticsQueryResponse response = _client
-        .createSearchAnalyticsQuery(_siteProperty, startDate, endDate, requestedDimensions,
+    SearchAnalyticsQueryResponse response =
+        _client.createSearchAnalyticsQuery(_siteProperty, startDate, endDate, requestedDimensions,
             GoogleWebmasterFilter.andGroupFilters(filters), rowLimit, 0).execute();
     return convertResponse(requestedMetrics, response);
   }
@@ -359,8 +363,7 @@ public class GoogleWebmasterDataFetcherImpl extends GoogleWebmasterDataFetcher {
   @Override
   public void performSearchAnalyticsQueryInBatch(List<ProducerJob> jobs, List<ArrayList<ApiDimensionFilter>> filterList,
       List<JsonBatchCallback<SearchAnalyticsQueryResponse>> callbackList, List<Dimension> requestedDimensions,
-      int rowLimit)
-      throws IOException {
+      int rowLimit) throws IOException {
     BatchRequest batchRequest = _client.createBatch();
 
     for (int i = 0; i < jobs.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27655c41/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
index c5a1c60..042bbae 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
@@ -372,7 +372,7 @@ class GoogleWebmasterExtractorIterator extends AsyncIteratorWithDataSink<String[
                 public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders)
                     throws IOException {
                   producer.onFailure(e.getMessage(), job, retries);
-                  log.debug(job.getPage() + " failed");
+                  log.info(job.getPage() + " failed");
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27655c41/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImplTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImplTest.java b/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImplTest.java
index b9149d1..1e2d5eb 100644
--- a/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImplTest.java
+++ b/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterDataFetcherImplTest.java
@@ -21,16 +21,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import org.apache.gobblin.configuration.WorkUnitState;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
 
 
 @Test(groups = {"gobblin.source.extractor.extract.google.webmaster"})
@@ -39,14 +36,12 @@ public class GoogleWebmasterDataFetcherImplTest {
   private String _property = "https://www.myproperty.com/";
 
   @Test
-  public void testGetAllPagesWhenRequestLessThan5000()
-      throws Exception {
+  public void testGetAllPagesWhenRequestLessThan5000() throws Exception {
     GoogleWebmasterClient client = Mockito.mock(GoogleWebmasterClient.class);
     List<String> retVal = Arrays.asList("abc", "def");
 
-    Mockito.when(client
-        .getPages(eq(_property), any(String.class), any(String.class), eq("ALL"), any(Integer.class), any(List.class),
-            any(List.class), eq(0))).thenReturn(retVal);
+    Mockito.when(client.getPages(eq(_property), any(String.class), any(String.class), eq("ALL"), any(Integer.class),
+        any(List.class), any(List.class), eq(0))).thenReturn(retVal);
 
     WorkUnitState workUnitState = new WorkUnitState();
     workUnitState.setProp(GoogleWebMasterSource.KEY_PROPERTY, _property);
@@ -66,16 +61,14 @@ public class GoogleWebmasterDataFetcherImplTest {
   }
 
   @Test
-  public void testGetAllPagesWhenDataSizeLessThan5000AndRequestAll()
-      throws Exception {
+  public void testGetAllPagesWhenDataSizeLessThan5000AndRequestAll() throws Exception {
     GoogleWebmasterClient client = Mockito.mock(GoogleWebmasterClient.class);
     List<String> allPages = new ArrayList<>();
     for (int i = 0; i < 10; ++i) {
       allPages.add(Integer.toString(i));
     }
-    Mockito.when(client
-        .getPages(eq(_property), any(String.class), any(String.class), eq("ALL"), any(Integer.class), any(List.class),
-            any(List.class), eq(0))).thenReturn(allPages);
+    Mockito.when(client.getPages(eq(_property), any(String.class), any(String.class), eq("ALL"), any(Integer.class),
+        any(List.class), any(List.class), eq(0))).thenReturn(allPages);
 
     WorkUnitState workUnitState = new WorkUnitState();
     workUnitState.setProp(GoogleWebMasterSource.KEY_PROPERTY, _property);
@@ -93,4 +86,83 @@ public class GoogleWebmasterDataFetcherImplTest {
         .getPages(eq(_property), any(String.class), any(String.class), eq("ALL"), any(Integer.class), any(List.class),
             any(List.class), eq(0));
   }
+
+  @Test
+  public void testGetPageSize1() throws Exception {
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(GoogleWebMasterSource.KEY_PROPERTY, _property);
+
+    GoogleWebmasterClient client = Mockito.mock(GoogleWebmasterClient.class);
+    List<String> list5000 = new ArrayList<>();
+    for (int i = 0; i < 5000; ++i) {
+      list5000.add(null);
+    }
+
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(0))).thenReturn(list5000);
+    GoogleWebmasterDataFetcherImpl dataFetcher = new GoogleWebmasterDataFetcherImpl(_property, client, workUnitState);
+    Assert.assertEquals(dataFetcher.getPagesSize("start_date", "end_date", "country", null, null), 5000);
+
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(5000))).thenReturn(list5000);
+    Assert.assertEquals(dataFetcher.getPagesSize("start_date", "end_date", "country", null, null), 10000);
+
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(10000))).thenReturn(list5000);
+    Assert.assertEquals(dataFetcher.getPagesSize("start_date", "end_date", "country", null, null), 15000);
+  }
+
+  @Test
+  public void testGetPageSize2() throws Exception {
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(GoogleWebMasterSource.KEY_PROPERTY, _property);
+
+    GoogleWebmasterClient client = Mockito.mock(GoogleWebmasterClient.class);
+    List<String> list2 = new ArrayList<>();
+    for (int i = 0; i < 2; ++i) {
+      list2.add(null);
+    }
+
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(0))).thenReturn(list2);
+    GoogleWebmasterDataFetcherImpl dataFetcher = new GoogleWebmasterDataFetcherImpl(_property, client, workUnitState);
+    int size = dataFetcher.getPagesSize("start_date", "end_date", "country", null, null);
+    Assert.assertEquals(size, 2);
+  }
+
+  @Test
+  public void testGetPageSize3() throws Exception {
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(GoogleWebMasterSource.KEY_PROPERTY, _property);
+
+    GoogleWebmasterClient client = Mockito.mock(GoogleWebmasterClient.class);
+    List<String> list5000 = new ArrayList<>();
+    for (int i = 0; i < 5000; ++i) {
+      list5000.add(null);
+    }
+
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(0))).thenReturn(list5000);
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(5000))).thenReturn(list5000);
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(10000))).thenReturn(list5000);
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(15000))).thenReturn(list5000);
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(20000))).thenReturn(list5000);
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(25000))).thenReturn(list5000);
+
+    List<String> list2 = new ArrayList<>();
+    for (int i = 0; i < 2; ++i) {
+      list2.add(null);
+    }
+    Mockito.when(client.getPages(any(String.class), any(String.class), any(String.class), any(String.class),
+        eq(GoogleWebmasterClient.API_ROW_LIMIT), any(List.class), any(List.class), eq(30000))).thenReturn(list2);
+
+    GoogleWebmasterDataFetcherImpl dataFetcher = new GoogleWebmasterDataFetcherImpl(_property, client, workUnitState);
+    int size = dataFetcher.getPagesSize("start_date", "end_date", "country", null, null);
+    Assert.assertEquals(size, 30002);
+  }
 }
\ No newline at end of file