You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/29 21:26:17 UTC
[1/2] beam git commit: Add support for TimePartitioning in
BigQueryIO.write().
Repository: beam
Updated Branches:
refs/heads/master 1c26b7488 -> 6280d497b
Add support for TimePartitioning in BigQueryIO.write().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0e03a33
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0e03a33
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0e03a33
Branch: refs/heads/master
Commit: b0e03a33cf0c2c573a2d34d88506e19ebb28c934
Parents: 1c26b74
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Sun Jul 30 21:42:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 14:05:15 2017 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 2 +
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +-
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 8 +++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 ++++++++++++++++
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 15 +++--
.../bigquery/DynamicDestinationsHelpers.java | 27 ++++++++-
.../sdk/io/gcp/bigquery/TableDestination.java | 39 ++++++++++++-
.../io/gcp/bigquery/TableDestinationCoder.java | 2 +
.../gcp/bigquery/TableDestinationCoderV2.java | 59 ++++++++++++++++++++
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 7 ++-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 50 +++++++++++++++++
.../sdk/io/gcp/bigquery/FakeJobService.java | 32 +++++++++--
.../sdk/io/gcp/bigquery/TableContainer.java | 2 +
14 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 81c8003..b563f8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
<apex.kryo.version>2.24.0</apex.kryo.version>
<api-common.version>1.0.0-rc2</api-common.version>
<avro.version>1.8.2</avro.version>
- <bigquery.version>v2-rev295-1.22.0</bigquery.version>
+ <bigquery.version>v2-rev355-1.22.0</bigquery.version>
<bigtable.version>0.9.7.1</bigtable.version>
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 5d42a5f..ff89933 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
/**
@@ -97,6 +98,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
RandomAccessDataCoder.class,
StringUtf8Coder.class,
TableDestinationCoder.class,
+ TableDestinationCoderV2.class,
TableRowJsonCoder.class,
TextualIntegerCoder.class,
VarIntCoder.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 0a1306d..76cf7e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -266,7 +266,7 @@ class BatchLoads<DestinationT>
.apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null))
.setCoder(
KvCoder.of(
- VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+ VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
.apply(GroupByKey.<Void, KV<TableDestination, String>>create())
.apply(Values.<Iterable<KV<TableDestination, String>>>create())
.apply(
@@ -323,7 +323,7 @@ class BatchLoads<DestinationT>
tempTables
.apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>())
- .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+ .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
.apply(
"WriteRenameUntriggered",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 78dcdde..7f9e27a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
@@ -291,6 +292,13 @@ public class BigQueryHelpers {
}
}
+ static class TimePartitioningToJson implements SerializableFunction<TimePartitioning, String> {
+ @Override
+ public String apply(TimePartitioning partitioning) {
+ return toJsonString(partitioning);
+ }
+ }
+
static String createJobIdToken(String jobName, String stepUuid) {
return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index feb085d..29828e4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -31,6 +31,7 @@ import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
@@ -60,9 +61,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRe
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -824,6 +827,7 @@ public class BigQueryIO {
@Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations();
@Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView();
@Nullable abstract ValueProvider<String> getJsonSchema();
+ @Nullable abstract ValueProvider<String> getJsonTimePartitioning();
abstract CreateDisposition getCreateDisposition();
abstract WriteDisposition getWriteDisposition();
/** Table description. Default is empty. */
@@ -854,6 +858,7 @@ public class BigQueryIO {
abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);
abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);
abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
+ abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning);
abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
abstract Builder<T> setTableDescription(String tableDescription);
@@ -1022,6 +1027,33 @@ public class BigQueryIO {
return toBuilder().setSchemaFromView(view).build();
}
+ /**
+ * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used
+ * when writing to a single table. If {@link #to(SerializableFunction)} or
+ * {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be
+ * directly in the returned {@link TableDestination}.
+ */
+ public Write<T> withTimePartitioning(TimePartitioning partitioning) {
+ return withJsonTimePartitioning(
+ StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning)));
+ }
+
+ /**
+ * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred
+ * {@link ValueProvider}.
+ */
+ public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partition) {
+ return withJsonTimePartitioning(NestedValueProvider.of(
+ partition, new TimePartitioningToJson()));
+ }
+
+ /**
+ * The same as {@link #withTimePartitioning}, but takes a JSON-serialized object.
+ */
+ public Write<T> withJsonTimePartitioning(ValueProvider<String> partition) {
+ return toBuilder().setJsonTimePartitioning(partition).build();
+ }
+
/** Specifies whether the table should be created if it does not exist. */
public Write<T> withCreateDisposition(CreateDisposition createDisposition) {
return toBuilder().setCreateDisposition(createDisposition).build();
@@ -1183,6 +1215,15 @@ public class BigQueryIO {
input.isBounded(),
method);
}
+ if (getJsonTimePartitioning() != null) {
+ checkArgument(getDynamicDestinations() == null,
+ "The supplied DynamicDestinations object can directly set TimePartitioning."
+ + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
+ checkArgument(getTableFunction() == null,
+ "The supplied getTableFunction object can directly set TimePartitioning."
+ + " There is no need to call BigQueryIO.Write.withTimePartitioning.");
+ }
+
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
if (getJsonTableRef() != null) {
@@ -1205,6 +1246,12 @@ public class BigQueryIO {
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getSchemaFromView());
}
+
+ // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
+ if (getJsonTimePartitioning() != null) {
+ dynamicDestinations = new ConstantTimePartitioningDestinations(
+ dynamicDestinations, getJsonTimePartitioning());
+ }
}
return expandTyped(input, dynamicDestinations);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 3dc10b0..7f83b83 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -73,7 +73,7 @@ public class CreateTables<DestinationT>
}
CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) {
- return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations);
+ return new CreateTables<>(createDisposition, bqServices, dynamicDestinations);
}
@Override
@@ -124,11 +124,14 @@ public class CreateTables<DestinationT>
DatasetService datasetService = bqServices.getDatasetService(options);
if (!createdTables.contains(tableSpec)) {
if (datasetService.getTable(tableReference) == null) {
- datasetService.createTable(
- new Table()
- .setTableReference(tableReference)
- .setSchema(tableSchema)
- .setDescription(tableDescription));
+ Table table = new Table()
+ .setTableReference(tableReference)
+ .setSchema(tableSchema)
+ .setDescription(tableDescription);
+ if (tableDestination.getTimePartitioning() != null) {
+ table.setTimePartitioning(tableDestination.getTimePartitioning());
+ }
+ datasetService.createTable(table);
}
createdTables.add(tableSpec);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 530e2b6..818ea34 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -108,7 +108,7 @@ class DynamicDestinationsHelpers {
@Override
public Coder<TableDestination> getDestinationCoder() {
- return TableDestinationCoder.of();
+ return TableDestinationCoderV2.of();
}
}
@@ -164,6 +164,31 @@ class DynamicDestinationsHelpers {
}
}
+ static class ConstantTimePartitioningDestinations<T>
+ extends DelegatingDynamicDestinations<T, TableDestination> {
+
+ @Nullable
+ private final ValueProvider<String> jsonTimePartitioning;
+
+ ConstantTimePartitioningDestinations(DynamicDestinations<T, TableDestination> inner,
+ ValueProvider<String> jsonTimePartitioning) {
+ super(inner);
+ this.jsonTimePartitioning = jsonTimePartitioning;
+ }
+
+ @Override
+ public TableDestination getDestination(ValueInSingleWindow<T> element) {
+ TableDestination destination = super.getDestination(element);
+ return new TableDestination(destination.getTableSpec(), destination.getTableDescription(),
+ jsonTimePartitioning.get());
+ }
+
+ @Override
+ public Coder<TableDestination> getDestinationCoder() {
+ return TableDestinationCoderV2.of();
+ }
+ }
+
/**
* Takes in a side input mapping tablespec to json table schema, and always returns the
* matching schema from the side input.
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index ecf35d8..79f1b22 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.Serializable;
import java.util.Objects;
import javax.annotation.Nullable;
@@ -31,18 +32,38 @@ public class TableDestination implements Serializable {
private final String tableSpec;
@Nullable
private final String tableDescription;
+ @Nullable
+ private final String jsonTimePartitioning;
public TableDestination(String tableSpec, @Nullable String tableDescription) {
- this.tableSpec = tableSpec;
- this.tableDescription = tableDescription;
+ this(tableSpec, tableDescription, (String) null);
}
public TableDestination(TableReference tableReference, @Nullable String tableDescription) {
- this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
+ this(tableReference, tableDescription, null);
+ }
+
+ public TableDestination(TableReference tableReference, @Nullable String tableDescription,
+ TimePartitioning timePartitioning) {
+ this(BigQueryHelpers.toTableSpec(tableReference), tableDescription,
+ timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
+ }
+
+ public TableDestination(String tableSpec, @Nullable String tableDescription,
+ TimePartitioning timePartitioning) {
+ this(tableSpec, tableDescription,
+ timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
+ }
+
+ public TableDestination(String tableSpec, @Nullable String tableDescription,
+ @Nullable String jsonTimePartitioning) {
+ this.tableSpec = tableSpec;
this.tableDescription = tableDescription;
+ this.jsonTimePartitioning = jsonTimePartitioning;
}
+
public String getTableSpec() {
return tableSpec;
}
@@ -51,6 +72,18 @@ public class TableDestination implements Serializable {
return BigQueryHelpers.parseTableSpec(tableSpec);
}
+ public String getJsonTimePartitioning() {
+ return jsonTimePartitioning;
+ }
+
+ public TimePartitioning getTimePartitioning() {
+ if (jsonTimePartitioning == null) {
+ return null;
+ } else {
+ return BigQueryHelpers.fromJsonString(jsonTimePartitioning, TimePartitioning.class);
+ }
+ }
+
@Nullable
public String getTableDescription() {
return tableDescription;
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index f034a03..2bfc2ca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -33,6 +33,8 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
private static final Coder<String> tableSpecCoder = StringUtf8Coder.of();
private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of());
+ private TableDestinationCoder() {}
+
public static TableDestinationCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
new file mode 100644
index 0000000..5bdab0d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A {@link Coder} for {@link TableDestination} that includes time partitioning information. This
+ * is a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility
+ * reasons. The old coder is kept around for the same compatibility reasons.
+ */
+public class TableDestinationCoderV2 extends AtomicCoder<TableDestination> {
+ private static final TableDestinationCoderV2 INSTANCE = new TableDestinationCoderV2();
+ private static final Coder<String> timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of());
+
+ public static TableDestinationCoderV2 of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(TableDestination value, OutputStream outStream) throws IOException {
+ TableDestinationCoder.of().encode(value, outStream);
+ timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream);
+ }
+
+ @Override
+ public TableDestination decode(InputStream inStream) throws IOException {
+ TableDestination destination = TableDestinationCoder.of().decode(inStream);
+ String jsonTimePartitioning = timePartitioningCoder.decode(inStream);
+ return new TableDestination(
+ destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c8fab75..a646f17 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -135,6 +136,7 @@ class WriteTables<DestinationT>
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
tableReference,
+ tableDestination.getTimePartitioning(),
tableSchema,
partitionFiles,
writeDisposition,
@@ -150,6 +152,7 @@ class WriteTables<DestinationT>
DatasetService datasetService,
String jobIdPrefix,
TableReference ref,
+ TimePartitioning timePartitioning,
@Nullable TableSchema schema,
List<String> gcsUris,
WriteDisposition writeDisposition,
@@ -164,7 +167,9 @@ class WriteTables<DestinationT>
.setWriteDisposition(writeDisposition.name())
.setCreateDisposition(createDisposition.name())
.setSourceFormat("NEWLINE_DELIMITED_JSON");
-
+ if (timePartitioning != null) {
+ loadConfig.setTimePartitioning(timePartitioning);
+ }
String projectId = ref.getProjectId();
Job lastFailedLoadJob = null;
for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 0ece3ee..18547cd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -47,6 +47,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
@@ -638,6 +639,55 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ public void testTimePartitioningStreamingInserts() throws Exception {
+ testTimePartitioning(Method.STREAMING_INSERTS);
+ }
+
+ @Test
+ public void testTimePartitioningBatchLoads() throws Exception {
+ testTimePartitioning(Method.FILE_LOADS);
+ }
+
+ public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception {
+ BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ bqOptions.setProject("project-id");
+ bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+ FakeDatasetService datasetService = new FakeDatasetService();
+ FakeBigQueryServices fakeBqServices =
+ new FakeBigQueryServices()
+ .withJobService(new FakeJobService())
+ .withDatasetService(datasetService);
+ datasetService.createDataset("project-id", "dataset-id", "", "");
+
+ Pipeline p = TestPipeline.create(bqOptions);
+ TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+ TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+
+ TimePartitioning timePartitioning = new TimePartitioning()
+ .setType("DAY")
+ .setExpirationMs(1000L);
+ TableSchema schema = new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("number").setType("INTEGER")));
+ p.apply(Create.of(row1, row1))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to("project-id:dataset-id.table-id")
+ .withTestServices(fakeBqServices)
+ .withMethod(insertMethod)
+ .withSchema(schema)
+ .withTimePartitioning(timePartitioning)
+ .withoutValidation());
+ p.run();
+ Table table = datasetService.getTable(
+ BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
+ assertEquals(schema, table.getSchema());
+ assertEquals(timePartitioning, table.getTimePartitioning());
+ }
+
+ @Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testTriggeredFileLoads() throws Exception {
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 7d5101d..cc600d1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -19,6 +19,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.util.BackOff;
@@ -39,6 +40,7 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -310,8 +312,13 @@ class FakeJobService implements JobService, Serializable {
if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
-
- datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
+ if (existingTable == null) {
+ existingTable = new Table().setTableReference(destination).setSchema(schema);
+ if (load.getTimePartitioning() != null) {
+ existingTable = existingTable.setTimePartitioning(load.getTimePartitioning());
+ }
+ datasetService.createTable(existingTable);
+ }
List<TableRow> rows = Lists.newArrayList();
for (ResourceId filename : sourceFiles) {
@@ -331,13 +338,30 @@ class FakeJobService implements JobService, Serializable {
if (!validateDispositions(existingTable, createDisposition, writeDisposition)) {
return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
}
-
+ TimePartitioning partitioning = null;
+ TableSchema schema = null;
+ boolean first = true;
List<TableRow> allRows = Lists.newArrayList();
for (TableReference source : sources) {
+ Table table = checkNotNull(datasetService.getTable(source));
+ if (!first) {
+ if (partitioning != table.getTimePartitioning()) {
+ return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+ }
+ if (schema != table.getSchema()) {
+ return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto());
+ }
+ }
+ partitioning = table.getTimePartitioning();
+ schema = table.getSchema();
+ first = false;
allRows.addAll(datasetService.getAllRows(
source.getProjectId(), source.getDatasetId(), source.getTableId()));
}
- datasetService.createTable(new Table().setTableReference(destination));
+ datasetService.createTable(new Table()
+ .setTableReference(destination)
+ .setSchema(schema)
+ .setTimePartitioning(partitioning));
datasetService.insertAll(destination, allRows, null);
return new JobStatus().setState("DONE");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
index 8915069..e016c98 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java
@@ -32,6 +32,7 @@ class TableContainer {
Long sizeBytes;
TableContainer(Table table) {
this.table = table;
+
this.rows = new ArrayList<>();
this.ids = new ArrayList<>();
this.sizeBytes = 0L;
@@ -54,6 +55,7 @@ class TableContainer {
return table;
}
+
List<TableRow> getRows() {
return rows;
}
[2/2] beam git commit: This closes #3663: [BEAM-2390] Add support for
TimePartitioning in BigQueryIO
Posted by jk...@apache.org.
This closes #3663: [BEAM-2390] Add support for TimePartitioning in BigQueryIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6280d497
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6280d497
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6280d497
Branch: refs/heads/master
Commit: 6280d497b5264536a485993ccdf21034f2a1c3d8
Parents: 1c26b74 b0e03a3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Aug 29 14:07:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Aug 29 14:07:06 2017 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 2 +
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +-
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 8 +++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 ++++++++++++++++
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 15 +++--
.../bigquery/DynamicDestinationsHelpers.java | 27 ++++++++-
.../sdk/io/gcp/bigquery/TableDestination.java | 39 ++++++++++++-
.../io/gcp/bigquery/TableDestinationCoder.java | 2 +
.../gcp/bigquery/TableDestinationCoderV2.java | 59 ++++++++++++++++++++
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 7 ++-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 50 +++++++++++++++++
.../sdk/io/gcp/bigquery/FakeJobService.java | 32 +++++++++--
.../sdk/io/gcp/bigquery/TableContainer.java | 2 +
14 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------