You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/11/02 22:20:54 UTC
[1/2] beam git commit: Updates BigQueryTableSource to consider data
in streaming buffer when determining estimated size.
Repository: beam
Updated Branches:
refs/heads/master 482d17889 -> 492547835
Updates BigQueryTableSource to consider data in streaming buffer when determining estimated size.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2c9fba4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2c9fba4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2c9fba4
Branch: refs/heads/master
Commit: b2c9fba4dd5f7f5a0ac0045f9ff8f30d55088a34
Parents: 482d178
Author: chamikara@google.com <ch...@google.com>
Authored: Sat Oct 21 19:20:07 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Nov 2 15:20:27 2017 -0700
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryTableSource.java | 10 ++-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 81 +++++++++++++++++++-
2 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f717cb7..dbac00f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@@ -102,8 +103,13 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class),
BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
- Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
- .getTable(table).getNumBytes();
+ Table tableRef = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+ .getTable(table);
+ Long numBytes = tableRef.getNumBytes();
+ if (tableRef.getStreamingBuffer() != null) {
+ numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
+ }
+
tableSizeBytes.compareAndSet(null, numBytes);
}
return tableSizeBytes.get();
http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index aa818c6..5b4b7e6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -41,6 +41,7 @@ import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.JobStatistics4;
import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -64,6 +65,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -343,7 +345,7 @@ public class BigQueryIOTest implements Serializable {
}));
PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L),
KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L)));
- p.run();
+ p.run();
}
@Test
@@ -1697,6 +1699,83 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ public void testEstimatedSizeWithoutStreamingBuffer() throws Exception {
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ FakeJobService fakeJobService = new FakeJobService();
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(fakeJobService)
+ .withDatasetService(fakeDatasetService);
+
+ List<TableRow> data = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
+
+ TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ fakeDatasetService.createDataset("project", "data_set", "", "", null);
+ fakeDatasetService.createTable(new Table().setTableReference(table)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER")))));
+ fakeDatasetService.insertAll(table, data, null);
+
+ String stepUuid = "testStepUuid";
+ BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+ stepUuid,
+ StaticValueProvider.of(table),
+ fakeBqServices,
+ TableRowJsonCoder.of(),
+ BigQueryIO.TableRowParser.INSTANCE);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ assertEquals(108, bqSource.getEstimatedSizeBytes(options));
+ }
+
+ @Test
+ public void testEstimatedSizeWithStreamingBuffer() throws Exception {
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ FakeJobService fakeJobService = new FakeJobService();
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(fakeJobService)
+ .withDatasetService(fakeDatasetService);
+
+ List<TableRow> data = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L),
+ new TableRow().set("name", "d").set("number", 4L),
+ new TableRow().set("name", "e").set("number", 5L),
+ new TableRow().set("name", "f").set("number", 6L));
+
+ TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ fakeDatasetService.createDataset("project", "data_set", "", "", null);
+ fakeDatasetService.createTable(new Table().setTableReference(table)
+ .setSchema(new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))))
+ .setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.valueOf(10))));
+ fakeDatasetService.insertAll(table, data, null);
+
+ String stepUuid = "testStepUuid";
+ BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+ stepUuid,
+ StaticValueProvider.of(table),
+ fakeBqServices,
+ TableRowJsonCoder.of(),
+ BigQueryIO.TableRowParser.INSTANCE);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ assertEquals(118, bqSource.getEstimatedSizeBytes(options));
+ }
+
+ @Test
public void testBigQueryQuerySourceInitSplit() throws Exception {
TableReference dryRunTable = new TableReference();
[2/2] beam git commit: This closes #4025
Posted by ch...@apache.org.
This closes #4025
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49254783
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49254783
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49254783
Branch: refs/heads/master
Commit: 49254783523b1e9351dc90270fbdf7bc1c5e6309
Parents: 482d178 b2c9fba
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Nov 2 15:20:38 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Thu Nov 2 15:20:38 2017 -0700
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryTableSource.java | 10 ++-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 81 +++++++++++++++++++-
2 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------