You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 21:11:12 UTC

[1/2] incubator-beam git commit: [BEAM-48] Refactor BigQueryServices to support extract and query jobs

Repository: incubator-beam
Updated Branches:
  refs/heads/master 97945648c -> 928640df2


[BEAM-48] Refactor BigQueryServices to support extract and query jobs


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe1ebc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe1ebc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe1ebc8

Branch: refs/heads/master
Commit: 2fe1ebc8ebbabe8836b73d3c7c5a251dc2d746bf
Parents: 9794564
Author: Pei He <pe...@google.com>
Authored: Tue May 3 17:55:11 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 14:10:13 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java |  40 ++++-
 .../apache/beam/sdk/util/BigQueryServices.java  |  40 +++--
 .../beam/sdk/util/BigQueryServicesImpl.java     | 148 ++++++++++++-------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  79 ++++++----
 .../beam/sdk/util/BigQueryServicesImplTest.java |  79 +++++-----
 5 files changed, 252 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 4a0bdac..7785298 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.LoadService;
+import org.apache.beam.sdk.util.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.BigQueryServicesImpl;
 import org.apache.beam.sdk.util.BigQueryTableInserter;
 import org.apache.beam.sdk.util.BigQueryTableRowIterator;
@@ -68,7 +68,9 @@ import org.apache.beam.sdk.values.PInput;
 
 import com.google.api.client.json.JsonFactory;
 import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.QueryRequest;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -1168,6 +1170,10 @@ public class BigQueryIO {
       // The maximum number of retry load jobs.
       private static final int MAX_RETRY_LOAD_JOBS = 3;
 
+      // The maximum number of retries to poll the status of a load job.
+      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+      private static final int MAX_JOB_STATUS_POLL_RETRIES = Integer.MAX_VALUE;
+
       private final BigQuerySink bigQuerySink;
 
       private BigQueryWriteOperation(BigQuerySink sink) {
@@ -1191,7 +1197,7 @@ public class BigQueryIO {
           }
           if (!tempFiles.isEmpty()) {
               load(
-                  bigQuerySink.bqServices.getLoadService(bqOptions),
+                  bigQuerySink.bqServices.getJobService(bqOptions),
                   bigQuerySink.jobIdToken,
                   fromJsonString(bigQuerySink.jsonTable, TableReference.class),
                   tempFiles,
@@ -1215,7 +1221,7 @@ public class BigQueryIO {
        * <p>If a load job failed, it will try another load job with a different job id.
        */
       private void load(
-          LoadService loadService,
+          JobService jobService,
           String jobIdPrefix,
           TableReference ref,
           List<String> gcsUris,
@@ -1238,8 +1244,9 @@ public class BigQueryIO {
             LOG.info("Previous load jobs failed, retrying.");
           }
           LOG.info("Starting BigQuery load job: {}", jobId);
-          loadService.startLoadJob(jobId, loadConfig);
-          BigQueryServices.Status jobStatus = loadService.pollJobStatus(projectId, jobId);
+          jobService.startLoadJob(jobId, loadConfig);
+          Status jobStatus = parseStatus(
+              jobService.pollJob(projectId, jobId, MAX_JOB_STATUS_POLL_RETRIES));
           switch (jobStatus) {
             case SUCCEEDED:
               return;
@@ -1669,6 +1676,29 @@ public class BigQueryIO {
     }
   }
 
+  /**
+   * Status of a BigQuery job or request.
+   */
+  enum Status {
+    SUCCEEDED,
+    FAILED,
+    UNKNOWN,
+  }
+
+  private static Status parseStatus(@Nullable Job job) {
+    if (job == null) {
+      return Status.UNKNOWN;
+    }
+    JobStatus status = job.getStatus();
+    if (status.getErrorResult() != null) {
+      return Status.FAILED;
+    } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
+      return Status.FAILED;
+    } else {
+      return Status.SUCCEEDED;
+    }
+  }
+
   private static String toJsonString(Object item) {
     if (item == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index 0fecfdc..b12e049 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -19,7 +19,10 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.BigQueryOptions;
 
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -30,33 +33,38 @@ import java.io.Serializable;
 public interface BigQueryServices extends Serializable {
 
   /**
-   * Status of a BigQuery job or request.
+   * Returns a real, mock, or fake {@link JobService}.
    */
-  enum Status {
-    SUCCEEDED,
-    FAILED,
-    UNKNOWN,
-  }
-
-  /**
-   * Returns a real, mock, or fake {@link LoadService}.
-   */
-  public LoadService getLoadService(BigQueryOptions bqOptions);
+  public JobService getJobService(BigQueryOptions bqOptions);
 
   /**
    * An interface for the Cloud BigQuery load service.
    */
-  public interface LoadService {
+  public interface JobService {
     /**
-     * Start a BigQuery load job.
+     * Starts a BigQuery load job.
      */
-    public void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
+    void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
         throws InterruptedException, IOException;
 
     /**
-     * Poll the status of a BigQuery load job.
+     * Start a BigQuery extract job.
+     */
+    void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+        throws InterruptedException, IOException;
+
+    /**
+     * Start a BigQuery extract job.
+     */
+    void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+        throws IOException, InterruptedException;
+
+    /**
+     * Waits for the job is Done, and returns the job.
+     *
+     * <p>Returns null if the {@code maxAttempts} retries reached.
      */
-    public Status pollJobStatus(String projectId, String jobId)
+    Job pollJob(String projectId, String jobId, int maxAttempts)
         throws InterruptedException, IOException;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 0502c6e..2bfe84f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -26,10 +26,11 @@ import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -45,37 +46,34 @@ import java.util.concurrent.TimeUnit;
  */
 public class BigQueryServicesImpl implements BigQueryServices {
 
-  // The maximum number of attempts to execute a load job RPC.
-  private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10;
+  // The maximum number of attempts to execute a BigQuery RPC.
+  private static final int MAX_RPC_ATTEMPTS = 10;
 
-  // The initial backoff for executing a load job RPC.
-  private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-  // The maximum number of retries to poll the status of a load job.
-  // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery load job finishes.
-  private static final int MAX_LOAD_JOB_POLL_RETRIES = Integer.MAX_VALUE;
+  // The initial backoff for executing a BigQuery RPC.
+  private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
 
-  // The initial backoff for polling the status of a load job.
-  private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
+  // The initial backoff for polling the status of a BigQuery job.
+  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
 
   @Override
-  public LoadService getLoadService(BigQueryOptions options) {
-    return new LoadServiceImpl(options);
+  public JobService getJobService(BigQueryOptions options) {
+    return new JobServiceImpl(options);
   }
 
   @VisibleForTesting
-  static class LoadServiceImpl implements BigQueryServices.LoadService {
-    private static final Logger LOG = LoggerFactory.getLogger(LoadServiceImpl.class);
+  static class JobServiceImpl implements BigQueryServices.JobService {
+    private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class);
 
     private final ApiErrorExtractor errorExtractor;
     private final Bigquery client;
 
     @VisibleForTesting
-    LoadServiceImpl(Bigquery client) {
+    JobServiceImpl(Bigquery client) {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = client;
     }
 
-    private LoadServiceImpl(BigQueryOptions options) {
+    private JobServiceImpl(BigQueryOptions options) {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = Transport.newBigQueryClient(options).build();
     }
@@ -83,7 +81,7 @@ public class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Retries the RPC for at most {@code MAX_LOAD_JOB_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
      * @throws IOException if it exceeds max RPC retries.
      */
@@ -91,47 +89,84 @@ public class BigQueryServicesImpl implements BigQueryServices {
     public void startLoadJob(
         String jobId,
         JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS);
-      startLoadJob(jobId, loadConfig, Sleeper.DEFAULT, backoff);
+      Job job = new Job();
+      JobReference jobRef = new JobReference();
+      jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId());
+      jobRef.setJobId(jobId);
+      job.setJobReference(jobRef);
+      JobConfiguration jobConfig = new JobConfiguration();
+      jobConfig.setLoad(loadConfig);
+      job.setConfiguration(jobConfig);
+
+      startJob(job, errorExtractor, client);
     }
 
     /**
      * {@inheritDoc}
      *
-     * <p>Retries the poll request for at most {@code MAX_LOAD_JOB_POLL_RETRIES} times
-     * until the job is DONE.
+     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC retries.
      */
     @Override
-    public Status pollJobStatus(String projectId, String jobId) throws InterruptedException {
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS);
-      return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff);
-    }
-
-    @VisibleForTesting
-    void startLoadJob(
-        String jobId,
-        JobConfigurationLoad loadConfig,
-        Sleeper sleeper,
-        BackOff backoff)
+    public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
         throws InterruptedException, IOException {
-      TableReference ref = loadConfig.getDestinationTable();
-      String projectId = ref.getProjectId();
+      Job job = new Job();
+      JobReference jobRef = new JobReference();
+      jobRef.setProjectId(extractConfig.getSourceTable().getProjectId());
+      jobRef.setJobId(jobId);
+      job.setJobReference(jobRef);
+      JobConfiguration jobConfig = new JobConfiguration();
+      jobConfig.setExtract(extractConfig);
+      job.setConfiguration(jobConfig);
+
+      startJob(job, errorExtractor, client);
+    }
 
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC retries.
+     */
+    @Override
+    public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun)
+        throws IOException, InterruptedException {
       Job job = new Job();
       JobReference jobRef = new JobReference();
-      jobRef.setProjectId(projectId);
+      jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId());
       jobRef.setJobId(jobId);
       job.setJobReference(jobRef);
-      JobConfiguration config = new JobConfiguration();
-      config.setLoad(loadConfig);
-      job.setConfiguration(config);
+      JobConfiguration jobConfig = new JobConfiguration();
+      jobConfig.setQuery(queryConfig);
+      jobConfig.setDryRun(dryRun);
+      job.setConfiguration(jobConfig);
+
+      startJob(job, errorExtractor, client);
+    }
 
+    private static void startJob(Job job,
+      ApiErrorExtractor errorExtractor,
+      Bigquery client) throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    static void startJob(
+        Job job,
+        ApiErrorExtractor errorExtractor,
+        Bigquery client,
+        Sleeper sleeper,
+        BackOff backoff)
+        throws InterruptedException, IOException {
+      JobReference jobRef = job.getJobReference();
       Exception lastException = null;
       do {
         try {
-          client.jobs().insert(projectId, job).execute();
+          client.jobs().insert(jobRef.getProjectId(), job).execute();
           return; // SUCCEEDED
         } catch (GoogleJsonResponseException e) {
           if (errorExtractor.itemAlreadyExists(e)) {
@@ -149,27 +184,30 @@ public class BigQueryServicesImpl implements BigQueryServices {
       throw new IOException(
           String.format(
               "Unable to insert job: %s, aborting after %d retries.",
-              jobId, MAX_LOAD_JOB_RPC_ATTEMPTS),
+              jobRef.getJobId(), MAX_RPC_ATTEMPTS),
           lastException);
     }
 
+    @Override
+    public Job pollJob(String projectId, String jobId, int maxAttempts)
+        throws InterruptedException {
+      BackOff backoff = new AttemptBoundedExponentialBackOff(
+          maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+      return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff);
+    }
+
     @VisibleForTesting
-    Status pollJobStatus(
+    Job pollJob(
         String projectId,
         String jobId,
         Sleeper sleeper,
         BackOff backoff) throws InterruptedException {
       do {
         try {
-          JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus();
+          Job job = client.jobs().get(projectId, jobId).execute();
+          JobStatus status = job.getStatus();
           if (status != null && status.getState() != null && status.getState().equals("DONE")) {
-            if (status.getErrorResult() != null) {
-              return Status.FAILED;
-            } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
-              return Status.FAILED;
-            } else {
-              return Status.SUCCEEDED;
-            }
+            return job;
           }
           // The job is not DONE, wait longer and retry.
         } catch (IOException e) {
@@ -177,16 +215,16 @@ public class BigQueryServicesImpl implements BigQueryServices {
           LOG.warn("Ignore the error and retry polling job status.", e);
         }
       } while (nextBackOff(sleeper, backoff));
-      LOG.warn("Unable to poll job status: {}, aborting after {} retries.",
-          jobId, MAX_LOAD_JOB_POLL_RETRIES);
-      return Status.UNKNOWN;
+      LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId);
+      return null;
     }
 
     /**
      * Identical to {@link BackOffUtils#next} but without checked IOException.
      * @throws InterruptedException
      */
-    private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+    private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
+        throws InterruptedException {
       try {
         return BackOffUtils.next(sleeper, backoff);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 63ff22c..7998fc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.BigQueryIO.Status;
 import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -39,16 +40,21 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.Status;
 import org.apache.beam.sdk.util.CoderUtils;
 
 import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -66,6 +72,7 @@ import org.mockito.MockitoAnnotations;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Tests for BigQueryIO.
@@ -73,23 +80,28 @@ import java.io.IOException;
 @RunWith(JUnit4.class)
 public class BigQueryIOTest {
 
+  // Status.UNKNOWN maps to null
+  private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
+      Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
+      Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
+
   private static class FakeBigQueryServices implements BigQueryServices {
 
-    private Object[] startLoadJobReturns;
+    private Object[] startJobReturns;
     private Object[] pollJobStatusReturns;
 
     /**
-     * Sets the return values for the mock {@link LoadService#startLoadJob}.
+     * Sets the return values for the mock {@link JobService#startLoadJob}.
      *
      * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
      */
     private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) {
-      this.startLoadJobReturns = startLoadJobReturns;
+      this.startJobReturns = startLoadJobReturns;
       return this;
     }
 
     /**
-     * Sets the return values for the mock {@link LoadService#pollJobStatus}.
+     * Sets the return values for the mock {@link JobService#pollJobStatus}.
      *
      * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
      */
@@ -99,19 +111,19 @@ public class BigQueryIOTest {
     }
 
     @Override
-    public LoadService getLoadService(BigQueryOptions bqOptions) {
-      return new FakeLoadService(startLoadJobReturns, pollJobStatusReturns);
+    public JobService getJobService(BigQueryOptions bqOptions) {
+      return new FakeLoadService(startJobReturns, pollJobStatusReturns);
     }
 
-    private static class FakeLoadService implements BigQueryServices.LoadService {
+    private static class FakeLoadService implements BigQueryServices.JobService {
 
-      private Object[] startLoadJobReturns;
+      private Object[] startJobReturns;
       private Object[] pollJobStatusReturns;
       private int startLoadJobCallsCount;
       private int pollJobStatusCallsCount;
 
       public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) {
-        this.startLoadJobReturns = startLoadJobReturns;
+        this.startJobReturns = startLoadJobReturns;
         this.pollJobStatusReturns = pollJobStatusReturns;
         this.startLoadJobCallsCount = 0;
         this.pollJobStatusCallsCount = 0;
@@ -120,35 +132,52 @@ public class BigQueryIOTest {
       @Override
       public void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
           throws InterruptedException, IOException {
-        if (startLoadJobCallsCount < startLoadJobReturns.length) {
-          Object ret = startLoadJobReturns[startLoadJobCallsCount++];
-          if (ret instanceof IOException) {
-            throw (IOException) ret;
+        startJob();
+      }
+
+      @Override
+      public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+          throws InterruptedException, IOException {
+        startJob();
+      }
+
+      @Override
+      public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+          throws IOException, InterruptedException {
+        startJob();
+      }
+
+      @Override
+      public Job pollJob(String projectId, String jobId, int maxAttemps)
+          throws InterruptedException {
+        if (pollJobStatusCallsCount < pollJobStatusReturns.length) {
+          Object ret = pollJobStatusReturns[pollJobStatusCallsCount++];
+          if (ret instanceof Status) {
+            return JOB_STATUS_MAP.get(ret);
           } else if (ret instanceof InterruptedException) {
             throw (InterruptedException) ret;
           } else {
-            return;
+            throw new RuntimeException("Unexpected return type: " + ret.getClass());
           }
         } else {
           throw new RuntimeException(
-              "Exceeded expected number of calls: " + startLoadJobReturns.length);
+              "Exceeded expected number of calls: " + pollJobStatusReturns.length);
         }
       }
 
-      @Override
-      public Status pollJobStatus(String projectId, String jobId) throws InterruptedException {
-        if (pollJobStatusCallsCount < pollJobStatusReturns.length) {
-          Object ret = pollJobStatusReturns[pollJobStatusCallsCount++];
-          if (ret instanceof Status) {
-            return (Status) ret;
+      private void startJob() throws IOException, InterruptedException {
+        if (startLoadJobCallsCount < startJobReturns.length) {
+          Object ret = startJobReturns[startLoadJobCallsCount++];
+          if (ret instanceof IOException) {
+            throw (IOException) ret;
           } else if (ret instanceof InterruptedException) {
             throw (InterruptedException) ret;
           } else {
-            throw new RuntimeException("Unexpected return type: " + ret.getClass());
+            return;
           }
         } else {
           throw new RuntimeException(
-              "Exceeded expected number of calls: " + pollJobStatusReturns.length);
+              "Exceeded expected number of calls: " + startJobReturns.length);
         }
       }
     }
@@ -160,7 +189,7 @@ public class BigQueryIOTest {
   @Rule
   public TemporaryFolder testFolder = new TemporaryFolder();
   @Mock
-  public BigQueryServices.LoadService mockBqLoadService;
+  public BigQueryServices.JobService mockBqLoadService;
 
   private BigQueryOptions bqOptions;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
index 74a2da2..238deed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.BigQueryServicesImpl.JobServiceImpl;
 
 import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
@@ -40,8 +41,10 @@ import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
@@ -91,11 +94,16 @@ public class BigQueryServicesImplTest {
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds.
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds.
    */
   @Test
   public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
     Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
+
     when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
     when(response.getStatusCode()).thenReturn(200);
     when(response.getContent()).thenReturn(toStream(testJob));
@@ -108,20 +116,24 @@ public class BigQueryServicesImplTest {
     Sleeper sleeper = new FastNanoClockAndSleeper();
     BackOff backoff = new AttemptBoundedExponentialBackOff(
         5 /* attempts */, 1000 /* initialIntervalMillis */);
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds
    * with an already exist job.
    */
   @Test
   public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
+
     when(response.getStatusCode()).thenReturn(409); // 409 means already exists
 
     TableReference ref = new TableReference();
@@ -132,9 +144,7 @@ public class BigQueryServicesImplTest {
     Sleeper sleeper = new FastNanoClockAndSleeper();
     BackOff backoff = new AttemptBoundedExponentialBackOff(
         5 /* attempts */, 1000 /* initialIntervalMillis */);
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
 
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
@@ -142,11 +152,15 @@ public class BigQueryServicesImplTest {
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds with a retry.
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds with a retry.
    */
   @Test
   public void testStartLoadJobRetry() throws IOException, InterruptedException {
     Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
 
     // First response is 403 rate limited, second response has valid payload.
     when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
@@ -163,19 +177,18 @@ public class BigQueryServicesImplTest {
     Sleeper sleeper = new FastNanoClockAndSleeper();
     BackOff backoff = new AttemptBoundedExponentialBackOff(
         5 /* attempts */, 1000 /* initialIntervalMillis */);
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} succeeds.
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} succeeds.
    */
   @Test
-  public void testPollJobStatusSucceeds() throws IOException, InterruptedException {
+  public void testPollJobSucceeds() throws IOException, InterruptedException {
     Job testJob = new Job();
     testJob.setStatus(new JobStatus().setState("DONE"));
 
@@ -183,22 +196,22 @@ public class BigQueryServicesImplTest {
     when(response.getStatusCode()).thenReturn(200);
     when(response.getContent()).thenReturn(toStream(testJob));
 
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    BigQueryServices.Status status =
-        loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    Job job =
+        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
 
-    assertEquals(BigQueryServices.Status.SUCCEEDED, status);
+    assertEquals(testJob, job);
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} fails.
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} fails.
    */
   @Test
-  public void testPollJobStatusFailed() throws IOException, InterruptedException {
+  public void testPollJobFailed() throws IOException, InterruptedException {
     Job testJob = new Job();
     testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto()));
 
@@ -206,22 +219,22 @@ public class BigQueryServicesImplTest {
     when(response.getStatusCode()).thenReturn(200);
     when(response.getContent()).thenReturn(toStream(testJob));
 
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    BigQueryServices.Status status =
-        loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    Job job =
+        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
 
-    assertEquals(BigQueryServices.Status.FAILED, status);
+    assertEquals(testJob, job);
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();
   }
 
   /**
-   * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} returns UNKNOWN.
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} returns UNKNOWN.
    */
   @Test
-  public void testPollJobStatusUnknown() throws IOException, InterruptedException {
+  public void testPollJobUnknown() throws IOException, InterruptedException {
     Job testJob = new Job();
     testJob.setStatus(new JobStatus());
 
@@ -229,12 +242,12 @@ public class BigQueryServicesImplTest {
     when(response.getStatusCode()).thenReturn(200);
     when(response.getContent()).thenReturn(toStream(testJob));
 
-    BigQueryServicesImpl.LoadServiceImpl loadService =
-        new BigQueryServicesImpl.LoadServiceImpl(bigquery);
-    BigQueryServices.Status status =
-        loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    Job job =
+        jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
 
-    assertEquals(BigQueryServices.Status.UNKNOWN, status);
+    assertEquals(null, job);
     verify(response, times(1)).getStatusCode();
     verify(response, times(1)).getContent();
     verify(response, times(1)).getContentType();


[2/2] incubator-beam git commit: Closes #285

Posted by dh...@apache.org.
Closes #285


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/928640df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/928640df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/928640df

Branch: refs/heads/master
Commit: 928640df269ca8abb09ae438427dec829bb3dd27
Parents: 9794564 2fe1ebc
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 4 14:10:14 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 14:10:14 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java |  40 ++++-
 .../apache/beam/sdk/util/BigQueryServices.java  |  40 +++--
 .../beam/sdk/util/BigQueryServicesImpl.java     | 148 ++++++++++++-------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  79 ++++++----
 .../beam/sdk/util/BigQueryServicesImplTest.java |  79 +++++-----
 5 files changed, 252 insertions(+), 134 deletions(-)
----------------------------------------------------------------------