You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/11/29 22:26:49 UTC
[beam] branch master updated: [BEAM-2870] Strips partition
decorators when creating/patching tables in batch
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 540ce4e [BEAM-2870] Strips partition decorators when creating/patching tables in batch
new ead5d43 This closes #4177: [BEAM-2870] Strips partition decorators when creating/patching tables in batch
540ce4e is described below
commit 540ce4ee87973cf82e2246ffd5d686055203069e
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Mon Nov 27 10:28:02 2017 -0800
[BEAM-2870] Strips partition decorators when creating/patching tables in batch
---
.../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 +++-
.../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 12 +++++++++---
.../beam/sdk/io/gcp/bigquery/FakeDatasetService.java | 19 ++++++++++++++-----
.../beam/sdk/io/gcp/bigquery/FakeJobService.java | 9 ++++++++-
4 files changed, 34 insertions(+), 10 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ca253c..7077651 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -264,7 +264,9 @@ class WriteTables<DestinationT>
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
- datasetService.patchTableDescription(ref, tableDescription);
+ datasetService.patchTableDescription(
+ ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+ tableDescription);
}
return;
case UNKNOWN:
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 08f7464..31c1781 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -589,7 +589,12 @@ public class BigQueryIOTest implements Serializable {
if (streaming) {
users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+
}
+
+ // Use a partition decorator to verify that partition decorators are supported.
+ final String partitionDecorator = "20171127";
+
users.apply(
"WriteBigQuery",
BigQueryIO.<String>write()
@@ -630,7 +635,8 @@ public class BigQueryIOTest implements Serializable {
verifySideInputs();
// Each user in it's own table.
return new TableDestination(
- "dataset-id.userid-" + userId, "table for userid " + userId);
+ "dataset-id.userid-" + userId + "$" + partitionDecorator,
+ "table for userid " + userId);
}
@Override
@@ -2528,7 +2534,7 @@ public class BigQueryIOTest implements Serializable {
p.apply(Create.of(row1, row2))
.apply(
BigQueryIO.writeTableRows()
- .to("project-id:dataset-id.table-id$decorator")
+ .to("project-id:dataset-id.table-id$20171127")
.withTestServices(fakeBqServices)
.withMethod(Method.STREAMING_INSERTS)
.withSchema(schema)
@@ -2539,7 +2545,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testTableDecoratorStripping() {
assertEquals("project:dataset.table",
- BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator"));
+ BigQueryHelpers.stripPartitionDecorator("project:dataset.table$20171127"));
assertEquals("project:dataset.table",
BigQueryHelpers.stripPartitionDecorator("project:dataset.table"));
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index ffab123..9609ada 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -96,6 +96,7 @@ class FakeDatasetService implements DatasetService, Serializable {
@Override
public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
+ validateWholeTableReference(tableRef);
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
@@ -109,12 +110,13 @@ class FakeDatasetService implements DatasetService, Serializable {
}
}
-
- @Override
- public void createTable(Table table) throws IOException {
+ /**
+ * Validates a table reference for whole-table operations, such as create/delete/patch. Such
+ * operations do not support partition decorators.
+ */
+ private static void validateWholeTableReference(TableReference tableReference)
+ throws IOException {
final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}");
-
- TableReference tableReference = table.getTableReference();
if (!tableRegexp.matcher(tableReference.getTableId()).matches()) {
throw new IOException(
String.format(
@@ -123,6 +125,12 @@ class FakeDatasetService implements DatasetService, Serializable {
+ " decorators cannot be used.",
tableReference.getTableId()));
}
+ }
+
+ @Override
+ public void createTable(Table table) throws IOException {
+ TableReference tableReference = table.getTableReference();
+ validateWholeTableReference(tableReference);
synchronized (BigQueryIOTest.tables) {
Map<String, TableContainer> dataset =
BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId());
@@ -245,6 +253,7 @@ class FakeDatasetService implements DatasetService, Serializable {
public Table patchTableDescription(TableReference tableReference,
@Nullable String tableDescription)
throws IOException, InterruptedException {
+ validateWholeTableReference(tableReference);
synchronized (BigQueryIOTest.tables) {
TableContainer tableContainer = getTableContainer(tableReference.getProjectId(),
tableReference.getDatasetId(), tableReference.getTableId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index edf2a55..fcf464f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -312,7 +312,14 @@ class FakeJobService implements JobService, Serializable {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
if (existingTable == null) {
- existingTable = new Table().setTableReference(destination).setSchema(schema);
+ TableReference strippedDestination =
+ destination
+ .clone()
+ .setTableId(BigQueryHelpers.stripPartitionDecorator(destination.getTableId()));
+ existingTable =
+ new Table()
+ .setTableReference(strippedDestination)
+ .setSchema(schema);
if (load.getTimePartitioning() != null) {
existingTable = existingTable.setTimePartitioning(load.getTimePartitioning());
}
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].