You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:53:14 UTC
[30/50] [abbrv] beam git commit: Cache result of
BigQuerySourceBase.split
Cache result of BigQuerySourceBase.split
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1533e2b9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1533e2b9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1533e2b9
Branch: refs/heads/gearpump-runner
Commit: 1533e2b9bc49971929277b804587d93d8d2cae4c
Parents: 29e054a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Apr 19 10:09:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 19 11:39:21 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 31 +++++++++++++-------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 18 +++++-------
.../sdk/io/gcp/bigquery/FakeJobService.java | 9 ++++++
3 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 1b90dc3..4142da9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -69,6 +69,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
protected final BigQueryServices bqServices;
protected final ValueProvider<String> executingProject;
+ private List<BoundedSource<TableRow>> cachedSplitResult;
+
BigQuerySourceBase(
ValueProvider<String> jobIdToken,
String extractDestinationDir,
@@ -83,17 +85,24 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
@Override
public List<BoundedSource<TableRow>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- TableReference tableToExtract = getTableToExtract(bqOptions);
- JobService jobService = bqServices.getJobService(bqOptions);
- String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
- List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
-
- TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
- .getTable(tableToExtract).getSchema();
-
- cleanupTempResource(bqOptions);
- return createSources(tempFiles, tableSchema);
+ // split() can be called multiple times, e.g. Dataflow runner may call it multiple times
+ // with different desiredBundleSizeBytes in case the split() call produces too many sources.
+ // We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate
+ // another BigQuery extract job for the repeated split() calls.
+ if (cachedSplitResult == null) {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableToExtract = getTableToExtract(bqOptions);
+ JobService jobService = bqServices.getJobService(bqOptions);
+ String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
+ List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+ TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+ .getTable(tableToExtract).getSchema();
+
+ cleanupTempResource(bqOptions);
+ cachedSplitResult = checkNotNull(createSources(tempFiles, tableSchema));
+ }
+ return cachedSplitResult;
}
protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d0004e4..62c5b5f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatistics;
@@ -1230,17 +1229,10 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBigQueryTableSourceInitSplit() throws Exception {
- Job extractJob = new Job();
- JobStatistics jobStats = new JobStatistics();
- JobStatistics4 extractStats = new JobStatistics4();
- extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
- jobStats.setExtract(extractStats);
- extractJob.setStatus(new JobStatus())
- .setStatistics(jobStats);
-
FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ FakeJobService fakeJobService = new FakeJobService();
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
- .withJobService(new FakeJobService())
+ .withJobService(fakeJobService)
.withDatasetService(fakeDatasetService);
List<TableRow> expected = ImmutableList.of(
@@ -1280,8 +1272,14 @@ public class BigQueryIOTest implements Serializable {
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
+ // Simulate a repeated call to split(), like a Dataflow worker will sometimes do.
+ sources = bqSource.split(200, options);
+ assertEquals(2, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+ // A repeated call to split() should not have caused a duplicate extract job.
+ assertEquals(1, fakeJobService.getNumExtractJobCalls());
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/1533e2b9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index a2454fb..cffd873 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -95,6 +95,7 @@ class FakeJobService implements JobService, Serializable {
private static final com.google.common.collect.Table<String, String, JobInfo> allJobs =
HashBasedTable.create();
+ private static int numExtractJobCalls = 0;
private static final com.google.common.collect.Table<String, String, List<String>>
filesForLoadJobs = HashBasedTable.create();
@@ -136,6 +137,8 @@ class FakeJobService implements JobService, Serializable {
checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
"Only extract to AVRO is supported");
synchronized (allJobs) {
+ ++numExtractJobCalls;
+
Job job = new Job();
job.setJobReference(jobRef);
job.setConfiguration(new JobConfiguration().setExtract(extractConfig));
@@ -145,6 +148,12 @@ class FakeJobService implements JobService, Serializable {
}
}
+ public int getNumExtractJobCalls() {
+ synchronized (allJobs) {
+ return numExtractJobCalls;
+ }
+ }
+
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
throws IOException, InterruptedException {