You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/04 21:11:12 UTC
[1/2] incubator-beam git commit: [BEAM-48] Refactor BigQueryServices
to support extract and query jobs
Repository: incubator-beam
Updated Branches:
refs/heads/master 97945648c -> 928640df2
[BEAM-48] Refactor BigQueryServices to support extract and query jobs
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe1ebc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe1ebc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe1ebc8
Branch: refs/heads/master
Commit: 2fe1ebc8ebbabe8836b73d3c7c5a251dc2d746bf
Parents: 9794564
Author: Pei He <pe...@google.com>
Authored: Tue May 3 17:55:11 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 14:10:13 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 40 ++++-
.../apache/beam/sdk/util/BigQueryServices.java | 40 +++--
.../beam/sdk/util/BigQueryServicesImpl.java | 148 ++++++++++++-------
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 79 ++++++----
.../beam/sdk/util/BigQueryServicesImplTest.java | 79 +++++-----
5 files changed, 252 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 4a0bdac..7785298 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.LoadService;
+import org.apache.beam.sdk.util.BigQueryServices.JobService;
import org.apache.beam.sdk.util.BigQueryServicesImpl;
import org.apache.beam.sdk.util.BigQueryTableInserter;
import org.apache.beam.sdk.util.BigQueryTableRowIterator;
@@ -68,7 +68,9 @@ import org.apache.beam.sdk.values.PInput;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -1168,6 +1170,10 @@ public class BigQueryIO {
// The maximum number of retry load jobs.
private static final int MAX_RETRY_LOAD_JOBS = 3;
+ // The maximum number of retries to poll the status of a load job.
+ // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+ private static final int MAX_JOB_STATUS_POLL_RETRIES = Integer.MAX_VALUE;
+
private final BigQuerySink bigQuerySink;
private BigQueryWriteOperation(BigQuerySink sink) {
@@ -1191,7 +1197,7 @@ public class BigQueryIO {
}
if (!tempFiles.isEmpty()) {
load(
- bigQuerySink.bqServices.getLoadService(bqOptions),
+ bigQuerySink.bqServices.getJobService(bqOptions),
bigQuerySink.jobIdToken,
fromJsonString(bigQuerySink.jsonTable, TableReference.class),
tempFiles,
@@ -1215,7 +1221,7 @@ public class BigQueryIO {
* <p>If a load job failed, it will try another load job with a different job id.
*/
private void load(
- LoadService loadService,
+ JobService jobService,
String jobIdPrefix,
TableReference ref,
List<String> gcsUris,
@@ -1238,8 +1244,9 @@ public class BigQueryIO {
LOG.info("Previous load jobs failed, retrying.");
}
LOG.info("Starting BigQuery load job: {}", jobId);
- loadService.startLoadJob(jobId, loadConfig);
- BigQueryServices.Status jobStatus = loadService.pollJobStatus(projectId, jobId);
+ jobService.startLoadJob(jobId, loadConfig);
+ Status jobStatus = parseStatus(
+ jobService.pollJob(projectId, jobId, MAX_JOB_STATUS_POLL_RETRIES));
switch (jobStatus) {
case SUCCEEDED:
return;
@@ -1669,6 +1676,29 @@ public class BigQueryIO {
}
}
+ /**
+ * Status of a BigQuery job or request.
+ */
+ enum Status {
+ SUCCEEDED,
+ FAILED,
+ UNKNOWN,
+ }
+
+ private static Status parseStatus(@Nullable Job job) {
+ if (job == null) {
+ return Status.UNKNOWN;
+ }
+ JobStatus status = job.getStatus();
+ if (status.getErrorResult() != null) {
+ return Status.FAILED;
+ } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
+ return Status.FAILED;
+ } else {
+ return Status.SUCCEEDED;
+ }
+ }
+
private static String toJsonString(Object item) {
if (item == null) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index 0fecfdc..b12e049 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -19,7 +19,10 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.BigQueryOptions;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
import java.io.IOException;
import java.io.Serializable;
@@ -30,33 +33,38 @@ import java.io.Serializable;
public interface BigQueryServices extends Serializable {
/**
- * Status of a BigQuery job or request.
+ * Returns a real, mock, or fake {@link JobService}.
*/
- enum Status {
- SUCCEEDED,
- FAILED,
- UNKNOWN,
- }
-
- /**
- * Returns a real, mock, or fake {@link LoadService}.
- */
- public LoadService getLoadService(BigQueryOptions bqOptions);
+ public JobService getJobService(BigQueryOptions bqOptions);
/**
* An interface for the Cloud BigQuery load service.
*/
- public interface LoadService {
+ public interface JobService {
/**
- * Start a BigQuery load job.
+ * Starts a BigQuery load job.
*/
- public void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
+ void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException;
/**
- * Poll the status of a BigQuery load job.
+ * Start a BigQuery extract job.
+ */
+ void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+ throws InterruptedException, IOException;
+
+ /**
+ * Start a BigQuery extract job.
+ */
+ void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+ throws IOException, InterruptedException;
+
+ /**
+ * Waits for the job is Done, and returns the job.
+ *
+ * <p>Returns null if the {@code maxAttempts} retries reached.
*/
- public Status pollJobStatus(String projectId, String jobId)
+ Job pollJob(String projectId, String jobId, int maxAttempts)
throws InterruptedException, IOException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 0502c6e..2bfe84f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -26,10 +26,11 @@ import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
@@ -45,37 +46,34 @@ import java.util.concurrent.TimeUnit;
*/
public class BigQueryServicesImpl implements BigQueryServices {
- // The maximum number of attempts to execute a load job RPC.
- private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10;
+ // The maximum number of attempts to execute a BigQuery RPC.
+ private static final int MAX_RPC_ATTEMPTS = 10;
- // The initial backoff for executing a load job RPC.
- private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
- // The maximum number of retries to poll the status of a load job.
- // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery load job finishes.
- private static final int MAX_LOAD_JOB_POLL_RETRIES = Integer.MAX_VALUE;
+ // The initial backoff for executing a BigQuery RPC.
+ private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
- // The initial backoff for polling the status of a load job.
- private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
+ // The initial backoff for polling the status of a BigQuery job.
+ private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
@Override
- public LoadService getLoadService(BigQueryOptions options) {
- return new LoadServiceImpl(options);
+ public JobService getJobService(BigQueryOptions options) {
+ return new JobServiceImpl(options);
}
@VisibleForTesting
- static class LoadServiceImpl implements BigQueryServices.LoadService {
- private static final Logger LOG = LoggerFactory.getLogger(LoadServiceImpl.class);
+ static class JobServiceImpl implements BigQueryServices.JobService {
+ private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class);
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
@VisibleForTesting
- LoadServiceImpl(Bigquery client) {
+ JobServiceImpl(Bigquery client) {
this.errorExtractor = new ApiErrorExtractor();
this.client = client;
}
- private LoadServiceImpl(BigQueryOptions options) {
+ private JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
this.client = Transport.newBigQueryClient(options).build();
}
@@ -83,7 +81,7 @@ public class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Retries the RPC for at most {@code MAX_LOAD_JOB_RPC_ATTEMPTS} times until it succeeds.
+ * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
* @throws IOException if it exceeds max RPC retries.
*/
@@ -91,47 +89,84 @@ public class BigQueryServicesImpl implements BigQueryServices {
public void startLoadJob(
String jobId,
JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS);
- startLoadJob(jobId, loadConfig, Sleeper.DEFAULT, backoff);
+ Job job = new Job();
+ JobReference jobRef = new JobReference();
+ jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId());
+ jobRef.setJobId(jobId);
+ job.setJobReference(jobRef);
+ JobConfiguration jobConfig = new JobConfiguration();
+ jobConfig.setLoad(loadConfig);
+ job.setConfiguration(jobConfig);
+
+ startJob(job, errorExtractor, client);
}
/**
* {@inheritDoc}
*
- * <p>Retries the poll request for at most {@code MAX_LOAD_JOB_POLL_RETRIES} times
- * until the job is DONE.
+ * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC retries.
*/
@Override
- public Status pollJobStatus(String projectId, String jobId) throws InterruptedException {
- BackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS);
- return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff);
- }
-
- @VisibleForTesting
- void startLoadJob(
- String jobId,
- JobConfigurationLoad loadConfig,
- Sleeper sleeper,
- BackOff backoff)
+ public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException {
- TableReference ref = loadConfig.getDestinationTable();
- String projectId = ref.getProjectId();
+ Job job = new Job();
+ JobReference jobRef = new JobReference();
+ jobRef.setProjectId(extractConfig.getSourceTable().getProjectId());
+ jobRef.setJobId(jobId);
+ job.setJobReference(jobRef);
+ JobConfiguration jobConfig = new JobConfiguration();
+ jobConfig.setExtract(extractConfig);
+ job.setConfiguration(jobConfig);
+
+ startJob(job, errorExtractor, client);
+ }
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC retries.
+ */
+ @Override
+ public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun)
+ throws IOException, InterruptedException {
Job job = new Job();
JobReference jobRef = new JobReference();
- jobRef.setProjectId(projectId);
+ jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId());
jobRef.setJobId(jobId);
job.setJobReference(jobRef);
- JobConfiguration config = new JobConfiguration();
- config.setLoad(loadConfig);
- job.setConfiguration(config);
+ JobConfiguration jobConfig = new JobConfiguration();
+ jobConfig.setQuery(queryConfig);
+ jobConfig.setDryRun(dryRun);
+ job.setConfiguration(jobConfig);
+
+ startJob(job, errorExtractor, client);
+ }
+ private static void startJob(Job job,
+ ApiErrorExtractor errorExtractor,
+ Bigquery client) throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+ }
+
+ @VisibleForTesting
+ static void startJob(
+ Job job,
+ ApiErrorExtractor errorExtractor,
+ Bigquery client,
+ Sleeper sleeper,
+ BackOff backoff)
+ throws InterruptedException, IOException {
+ JobReference jobRef = job.getJobReference();
Exception lastException = null;
do {
try {
- client.jobs().insert(projectId, job).execute();
+ client.jobs().insert(jobRef.getProjectId(), job).execute();
return; // SUCCEEDED
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemAlreadyExists(e)) {
@@ -149,27 +184,30 @@ public class BigQueryServicesImpl implements BigQueryServices {
throw new IOException(
String.format(
"Unable to insert job: %s, aborting after %d retries.",
- jobId, MAX_LOAD_JOB_RPC_ATTEMPTS),
+ jobRef.getJobId(), MAX_RPC_ATTEMPTS),
lastException);
}
+ @Override
+ public Job pollJob(String projectId, String jobId, int maxAttempts)
+ throws InterruptedException {
+ BackOff backoff = new AttemptBoundedExponentialBackOff(
+ maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+ return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff);
+ }
+
@VisibleForTesting
- Status pollJobStatus(
+ Job pollJob(
String projectId,
String jobId,
Sleeper sleeper,
BackOff backoff) throws InterruptedException {
do {
try {
- JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus();
+ Job job = client.jobs().get(projectId, jobId).execute();
+ JobStatus status = job.getStatus();
if (status != null && status.getState() != null && status.getState().equals("DONE")) {
- if (status.getErrorResult() != null) {
- return Status.FAILED;
- } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
- return Status.FAILED;
- } else {
- return Status.SUCCEEDED;
- }
+ return job;
}
// The job is not DONE, wait longer and retry.
} catch (IOException e) {
@@ -177,16 +215,16 @@ public class BigQueryServicesImpl implements BigQueryServices {
LOG.warn("Ignore the error and retry polling job status.", e);
}
} while (nextBackOff(sleeper, backoff));
- LOG.warn("Unable to poll job status: {}, aborting after {} retries.",
- jobId, MAX_LOAD_JOB_POLL_RETRIES);
- return Status.UNKNOWN;
+ LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId);
+ return null;
}
/**
* Identical to {@link BackOffUtils#next} but without checked IOException.
* @throws InterruptedException
*/
- private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
+ throws InterruptedException {
try {
return BackOffUtils.next(sleeper, backoff);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 63ff22c..7998fc7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.BigQueryIO.Status;
import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.BigQueryOptions;
@@ -39,16 +40,21 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.Status;
import org.apache.beam.sdk.util.CoderUtils;
import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.junit.Assert;
@@ -66,6 +72,7 @@ import org.mockito.MockitoAnnotations;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.util.Map;
/**
* Tests for BigQueryIO.
@@ -73,23 +80,28 @@ import java.io.IOException;
@RunWith(JUnit4.class)
public class BigQueryIOTest {
+ // Status.UNKNOWN maps to null
+ private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
+ Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
+ Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
+
private static class FakeBigQueryServices implements BigQueryServices {
- private Object[] startLoadJobReturns;
+ private Object[] startJobReturns;
private Object[] pollJobStatusReturns;
/**
- * Sets the return values for the mock {@link LoadService#startLoadJob}.
+ * Sets the return values for the mock {@link JobService#startLoadJob}.
*
* <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
*/
private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) {
- this.startLoadJobReturns = startLoadJobReturns;
+ this.startJobReturns = startLoadJobReturns;
return this;
}
/**
- * Sets the return values for the mock {@link LoadService#pollJobStatus}.
+ * Sets the return values for the mock {@link JobService#pollJobStatus}.
*
* <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
*/
@@ -99,19 +111,19 @@ public class BigQueryIOTest {
}
@Override
- public LoadService getLoadService(BigQueryOptions bqOptions) {
- return new FakeLoadService(startLoadJobReturns, pollJobStatusReturns);
+ public JobService getJobService(BigQueryOptions bqOptions) {
+ return new FakeLoadService(startJobReturns, pollJobStatusReturns);
}
- private static class FakeLoadService implements BigQueryServices.LoadService {
+ private static class FakeLoadService implements BigQueryServices.JobService {
- private Object[] startLoadJobReturns;
+ private Object[] startJobReturns;
private Object[] pollJobStatusReturns;
private int startLoadJobCallsCount;
private int pollJobStatusCallsCount;
public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) {
- this.startLoadJobReturns = startLoadJobReturns;
+ this.startJobReturns = startLoadJobReturns;
this.pollJobStatusReturns = pollJobStatusReturns;
this.startLoadJobCallsCount = 0;
this.pollJobStatusCallsCount = 0;
@@ -120,35 +132,52 @@ public class BigQueryIOTest {
@Override
public void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
- if (startLoadJobCallsCount < startLoadJobReturns.length) {
- Object ret = startLoadJobReturns[startLoadJobCallsCount++];
- if (ret instanceof IOException) {
- throw (IOException) ret;
+ startJob();
+ }
+
+ @Override
+ public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+ throws InterruptedException, IOException {
+ startJob();
+ }
+
+ @Override
+ public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+ throws IOException, InterruptedException {
+ startJob();
+ }
+
+ @Override
+ public Job pollJob(String projectId, String jobId, int maxAttemps)
+ throws InterruptedException {
+ if (pollJobStatusCallsCount < pollJobStatusReturns.length) {
+ Object ret = pollJobStatusReturns[pollJobStatusCallsCount++];
+ if (ret instanceof Status) {
+ return JOB_STATUS_MAP.get(ret);
} else if (ret instanceof InterruptedException) {
throw (InterruptedException) ret;
} else {
- return;
+ throw new RuntimeException("Unexpected return type: " + ret.getClass());
}
} else {
throw new RuntimeException(
- "Exceeded expected number of calls: " + startLoadJobReturns.length);
+ "Exceeded expected number of calls: " + pollJobStatusReturns.length);
}
}
- @Override
- public Status pollJobStatus(String projectId, String jobId) throws InterruptedException {
- if (pollJobStatusCallsCount < pollJobStatusReturns.length) {
- Object ret = pollJobStatusReturns[pollJobStatusCallsCount++];
- if (ret instanceof Status) {
- return (Status) ret;
+ private void startJob() throws IOException, InterruptedException {
+ if (startLoadJobCallsCount < startJobReturns.length) {
+ Object ret = startJobReturns[startLoadJobCallsCount++];
+ if (ret instanceof IOException) {
+ throw (IOException) ret;
} else if (ret instanceof InterruptedException) {
throw (InterruptedException) ret;
} else {
- throw new RuntimeException("Unexpected return type: " + ret.getClass());
+ return;
}
} else {
throw new RuntimeException(
- "Exceeded expected number of calls: " + pollJobStatusReturns.length);
+ "Exceeded expected number of calls: " + startJobReturns.length);
}
}
}
@@ -160,7 +189,7 @@ public class BigQueryIOTest {
@Rule
public TemporaryFolder testFolder = new TemporaryFolder();
@Mock
- public BigQueryServices.LoadService mockBqLoadService;
+ public BigQueryServices.JobService mockBqLoadService;
private BigQueryOptions bqOptions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe1ebc8/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
index 74a2da2..238deed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.BigQueryServicesImpl.JobServiceImpl;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
@@ -40,8 +41,10 @@ import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.collect.ImmutableList;
import org.junit.Before;
@@ -91,11 +94,16 @@ public class BigQueryServicesImplTest {
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds.
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds.
*/
@Test
public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
Job testJob = new Job();
+ JobReference jobRef = new JobReference();
+ jobRef.setJobId("jobId");
+ jobRef.setProjectId("projectId");
+ testJob.setJobReference(jobRef);
+
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));
@@ -108,20 +116,24 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+ JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds
* with an already exist job.
*/
@Test
public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException {
+ Job testJob = new Job();
+ JobReference jobRef = new JobReference();
+ jobRef.setJobId("jobId");
+ jobRef.setProjectId("projectId");
+ testJob.setJobReference(jobRef);
+
when(response.getStatusCode()).thenReturn(409); // 409 means already exists
TableReference ref = new TableReference();
@@ -132,9 +144,7 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+ JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
@@ -142,11 +152,15 @@ public class BigQueryServicesImplTest {
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds with a retry.
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds with a retry.
*/
@Test
public void testStartLoadJobRetry() throws IOException, InterruptedException {
Job testJob = new Job();
+ JobReference jobRef = new JobReference();
+ jobRef.setJobId("jobId");
+ jobRef.setProjectId("projectId");
+ testJob.setJobReference(jobRef);
// First response is 403 rate limited, second response has valid payload.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
@@ -163,19 +177,18 @@ public class BigQueryServicesImplTest {
Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- loadService.startLoadJob("jobId", loadConfig, sleeper, backoff);
+ JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} succeeds.
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} succeeds.
*/
@Test
- public void testPollJobStatusSucceeds() throws IOException, InterruptedException {
+ public void testPollJobSucceeds() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus().setState("DONE"));
@@ -183,22 +196,22 @@ public class BigQueryServicesImplTest {
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- BigQueryServices.Status status =
- loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+ BigQueryServicesImpl.JobServiceImpl jobService =
+ new BigQueryServicesImpl.JobServiceImpl(bigquery);
+ Job job =
+ jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
- assertEquals(BigQueryServices.Status.SUCCEEDED, status);
+ assertEquals(testJob, job);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} fails.
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} fails.
*/
@Test
- public void testPollJobStatusFailed() throws IOException, InterruptedException {
+ public void testPollJobFailed() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto()));
@@ -206,22 +219,22 @@ public class BigQueryServicesImplTest {
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- BigQueryServices.Status status =
- loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+ BigQueryServicesImpl.JobServiceImpl jobService =
+ new BigQueryServicesImpl.JobServiceImpl(bigquery);
+ Job job =
+ jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
- assertEquals(BigQueryServices.Status.FAILED, status);
+ assertEquals(testJob, job);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}
/**
- * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} returns UNKNOWN.
+ * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} returns UNKNOWN.
*/
@Test
- public void testPollJobStatusUnknown() throws IOException, InterruptedException {
+ public void testPollJobUnknown() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus());
@@ -229,12 +242,12 @@ public class BigQueryServicesImplTest {
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));
- BigQueryServicesImpl.LoadServiceImpl loadService =
- new BigQueryServicesImpl.LoadServiceImpl(bigquery);
- BigQueryServices.Status status =
- loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
+ BigQueryServicesImpl.JobServiceImpl jobService =
+ new BigQueryServicesImpl.JobServiceImpl(bigquery);
+ Job job =
+ jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
- assertEquals(BigQueryServices.Status.UNKNOWN, status);
+ assertEquals(null, job);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
[2/2] incubator-beam git commit: Closes #285
Posted by dh...@apache.org.
Closes #285
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/928640df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/928640df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/928640df
Branch: refs/heads/master
Commit: 928640df269ca8abb09ae438427dec829bb3dd27
Parents: 9794564 2fe1ebc
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 4 14:10:14 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 4 14:10:14 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 40 ++++-
.../apache/beam/sdk/util/BigQueryServices.java | 40 +++--
.../beam/sdk/util/BigQueryServicesImpl.java | 148 ++++++++++++-------
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 79 ++++++----
.../beam/sdk/util/BigQueryServicesImplTest.java | 79 +++++-----
5 files changed, 252 insertions(+), 134 deletions(-)
----------------------------------------------------------------------