You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:01 UTC
[07/10] beam git commit: Simplify configuration of StreamWithDedup
Simplify configuration of StreamWithDedup
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1adcbaea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1adcbaea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1adcbaea
Branch: refs/heads/master
Commit: 1adcbaea799e83016ec91f7b7155c3a25804ce6c
Parents: 5c71589
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:19:51 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:32 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 73 +++++++-------------
1 file changed, 26 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1adcbaea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d2f6ba6..e039c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,7 +40,6 @@ import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -462,7 +461,7 @@ public class BigQueryIO {
abstract boolean getValidate();
@Nullable abstract Boolean getFlattenResults();
@Nullable abstract Boolean getUseLegacySql();
- @Nullable abstract BigQueryServices getBigQueryServices();
+ abstract BigQueryServices getBigQueryServices();
abstract Builder toBuilder();
@@ -645,8 +644,6 @@ public class BigQueryIO {
jobUuid, new BeamJobUuidToBigQueryJobUuid());
BoundedSource<TableRow> source;
- final BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
@@ -670,11 +667,11 @@ public class BigQueryIO {
getFlattenResults(),
getUseLegacySql(),
extractDestinationDir,
- bqServices);
+ getBigQueryServices());
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
source = BigQueryTableSource.create(
- jobIdToken, inputTable, extractDestinationDir, bqServices,
+ jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
StaticValueProvider.of(executingProject));
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -687,7 +684,7 @@ public class BigQueryIO {
.setProjectId(executingProject)
.setJobId(getExtractJobId(jobIdToken));
- Job extractJob = bqServices.getJobService(bqOptions)
+ Job extractJob = getBigQueryServices().getJobService(bqOptions)
.getJob(jobRef);
Collection<String> extractFiles = null;
@@ -1390,7 +1387,7 @@ public class BigQueryIO {
@Nullable abstract String getTableDescription();
/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();
- @Nullable abstract BigQueryServices getBigQueryServices();
+ abstract BigQueryServices getBigQueryServices();
abstract Builder toBuilder();
@@ -1650,12 +1647,10 @@ public class BigQueryIO {
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
// The user specified a table.
- BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
if (getJsonTableRef() != null && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = bqServices.getDatasetService(options);
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1693,7 +1688,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bqServices == null) {
+ if (getBigQueryServices() == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -1711,19 +1706,11 @@ public class BigQueryIO {
public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
- return input.apply(
- new StreamWithDeDup(getTable(), getTableRefFunction(),
- getJsonSchema() == null ? null : NestedValueProvider.of(
- getJsonSchema(), new JsonSchemaToTableSchema()),
- getCreateDisposition(),
- getTableDescription(),
- bqServices));
+ return input.apply(new StreamWithDeDup(this));
}
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -1786,7 +1773,7 @@ public class BigQueryIO {
.apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
.apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
false,
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
@@ -1800,7 +1787,7 @@ public class BigQueryIO {
.apply("TempTablesView", View.<String>asIterable());
singleton.apply(ParDo
.of(new WriteRename(
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
NestedValueProvider.of(table, new TableRefToJson()),
getWriteDisposition(),
@@ -1814,7 +1801,7 @@ public class BigQueryIO {
.apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
.apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
true,
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
@@ -2740,25 +2727,11 @@ public class BigQueryIO {
* it leverages BigQuery best effort de-dup mechanism.
*/
private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
- @Nullable private final transient ValueProvider<TableReference> tableReference;
- @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
- @Nullable private final transient ValueProvider<TableSchema> tableSchema;
- private final Write.CreateDisposition createDisposition;
- private final BigQueryServices bqServices;
- @Nullable private final String tableDescription;
+ private final Write write;
/** Constructor. */
- StreamWithDeDup(ValueProvider<TableReference> tableReference,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<TableSchema> tableSchema,
- Write.CreateDisposition createDisposition,
- @Nullable String tableDescription, BigQueryServices bqServices) {
- this.tableReference = tableReference;
- this.tableRefFunction = tableRefFunction;
- this.tableSchema = tableSchema;
- this.createDisposition = createDisposition;
- this.bqServices = checkNotNull(bqServices, "bqServices");
- this.tableDescription = tableDescription;
+ StreamWithDeDup(Write write) {
+ this.write = write;
}
@Override
@@ -2780,20 +2753,26 @@ public class BigQueryIO {
PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of(
new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class),
- tableReference, tableRefFunction)));
+ write.getTable(), write.getTableRefFunction())));
// To prevent having the same TableRow processed more than once with regenerated
// different unique ids, this implementation relies on "checkpointing", which is
// achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
// performed by Reshuffle.
+ NestedValueProvider<TableSchema, String> schema =
+ write.getJsonSchema() == null
+ ? null
+ : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
tagged
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
.apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(ParDo.of(new StreamingWriteFn(
- tableSchema,
- createDisposition,
- tableDescription,
- bqServices)));
+ .apply(
+ ParDo.of(
+ new StreamingWriteFn(
+ schema,
+ write.getCreateDisposition(),
+ write.getTableDescription(),
+ write.getBigQueryServices())));
// Note that the implementation to return PDone here breaks the
// implicit assumption about the job execution order. If a user