You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/21 17:52:46 UTC
[02/50] [abbrv] beam git commit: Refactor streaming write branch into
separate reusable components.
Refactor streaming write branch into separate reusable components.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58ed5c7e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58ed5c7e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58ed5c7e
Branch: refs/heads/gearpump-runner
Commit: 58ed5c7ecd247f9c5e5a15deff40ffa8c800af25
Parents: 67a5f82
Author: Reuven Lax <re...@google.com>
Authored: Tue Mar 28 19:34:56 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 69 ++++++------
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 100 +++++++++++++++++
.../io/gcp/bigquery/GenerateShardedTable.java | 48 ++++++++
.../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 65 ++++++-----
.../sdk/io/gcp/bigquery/StreamWithDeDup.java | 90 ---------------
.../sdk/io/gcp/bigquery/StreamingInserts.java | 110 +++++++++++++++++++
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 82 +-------------
.../sdk/io/gcp/bigquery/TableDestination.java | 48 +++++++-
.../io/gcp/bigquery/TableDestinationCoder.java | 64 +++++++++++
.../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 71 ++++++++++++
.../gcp/bigquery/TagWithUniqueIdsAndTable.java | 101 -----------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 18 +--
12 files changed, 521 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index af0d561..af19b83 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,6 +40,7 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
@@ -67,6 +69,7 @@ import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -681,8 +684,8 @@ public class BigQueryIO {
static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@Nullable abstract ValueProvider<String> getJsonTableRef();
- @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableReference>
- getTableRefFunction();
+ @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
+ getTableFunction();
@Nullable abstract SerializableFunction<T, TableRow> getFormatFunction();
/** Table schema. The schema is required only if the table does not exist. */
@Nullable abstract ValueProvider<String> getJsonSchema();
@@ -783,7 +786,7 @@ public class BigQueryIO {
private void ensureToNotCalledYet() {
checkState(
getJsonTableRef() == null && getTable() == null
- && getTableRefFunction() == null, "to() already called");
+ && getTableFunction() == null, "to() already called");
}
/**
@@ -802,13 +805,16 @@ public class BigQueryIO {
/** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> tableSpec) {
ensureToNotCalledYet();
+ String tableDescription = getTableDescription();
+ if (tableDescription == null) {
+ tableDescription = "";
+ }
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
- .setTableRefFunction(new TranslateTableSpecFunction<T>(
- new ConstantTableSpecFunction<T>(tableSpec)))
+ .setTableFunction(new ConstantTableFunction<T>(tableSpec, tableDescription))
.build();
}
@@ -819,6 +825,8 @@ public class BigQueryIO {
public Write<T> to(
SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
+ ensureToNotCalledYet();
+ return toBuilder().setTableFunction(tableFunction).build();
}
/**
@@ -828,7 +836,7 @@ public class BigQueryIO {
private Write<T> toTableReference(
SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction) {
ensureToNotCalledYet();
- return toBuilder().setTableRefFunction(tableRefFunction).build();
+ return toBuilder().setTableFunction(tableFunction).build();
}
/**
@@ -838,32 +846,19 @@ public class BigQueryIO {
return toBuilder().setFormatFunction(formatFunction).build();
}
- private static class TranslateTableSpecFunction<T> implements
- SerializableFunction<ValueInSingleWindow<T>, TableReference> {
- private SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction;
-
- TranslateTableSpecFunction(
- SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
- this.tableSpecFunction = tableSpecFunction;
- }
+ static class ConstantTableFunction<T> implements
+ SerializableFunction<ValueInSingleWindow<T>, TableDestination> {
+ private final ValueProvider<String> tableSpec;
+ private final String tableDescription;
- @Override
- public TableReference apply(ValueInSingleWindow<T> value) {
- return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value));
- }
- }
-
- static class ConstantTableSpecFunction<T> implements
- SerializableFunction<ValueInSingleWindow<T>, String> {
- private ValueProvider<String> tableSpec;
-
- ConstantTableSpecFunction(ValueProvider<String> tableSpec) {
+ ConstantTableFunction(ValueProvider<String> tableSpec, String tableDescription) {
this.tableSpec = tableSpec;
+ this.tableDescription = tableDescription;
}
@Override
- public String apply(ValueInSingleWindow<T> value) {
- return tableSpec.get();
+ public TableDestination apply(ValueInSingleWindow<T> value) {
+ return new TableDestination(tableSpec.get(), tableDescription);
}
}
@@ -919,7 +914,7 @@ public class BigQueryIO {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
// Exactly one of the table and table reference can be configured.
- checkState(getTableRefFunction() != null,
+ checkState(getTableFunction() != null,
"must set the table reference of a BigQueryIO.Write transform");
checkArgument(getFormatFunction() != null,
@@ -978,10 +973,16 @@ public class BigQueryIO {
@Override
public WriteResult expand(PCollection<T> input) {
+ PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
+ input.apply("PrepareWrite", ParDo.of(
+ new PrepareWrite<T>(getTableFunction(), getFormatFunction())))
+ .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of()));
+
+
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
- // StreamWithDeDup and BigQuery's streaming import API.
+ // StreamingInserts and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED) {
- return input.apply(new StreamWithDeDup<T>(this));
+ return rowsWithDestination.apply(new StreamingInserts(this));
} else {
return input.apply(new BatchLoadBigQuery<T>(this));
}
@@ -1002,8 +1003,8 @@ public class BigQueryIO {
.addIfNotNull(DisplayData.item("schema", getJsonSchema())
.withLabel("Table Schema"));
- if (getTableRefFunction() != null) {
- builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
+ if (getTableFunction() != null) {
+ builder.add(DisplayData.item("tableFn", getTableFunction().getClass())
.withLabel("Table Reference Function"));
}
@@ -1025,7 +1026,7 @@ public class BigQueryIO {
}
/**
- * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+ * Returns the table to write, or {@code null} if writing with {@code tableFunction}.
*
* <p>If the table's project is not specified, use the executing project.
*/
@@ -1066,7 +1067,7 @@ public class BigQueryIO {
*/
@VisibleForTesting
static void clearCreatedTables() {
- StreamingWriteFn.clearCreatedTables();
+ CreateTables.clearCreatedTables();
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
new file mode 100644
index 0000000..e216553
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+
+
+/**
+ * Creates any tables needed before performing streaming writes to the tables. This is a
+ * side-effect {l@ink DoFn}, and returns the original collection unchanged.
+ */
+public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
+ KV<TableDestination, TableRow>> {
+ private final CreateDisposition createDisposition;
+ private final BigQueryServices bqServices;
+ private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
+
+
+ /** The list of tables created so far, so we don't try the creation
+ each time.
+ * TODO: We should put a bound on memory usage of this. Use guava cache instead.
+ */
+ private static Set<String> createdTables =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+ public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices,
+ SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+ this.createDisposition = createDisposition;
+ this.bqServices = bqServices;
+ this.schemaFunction = schemaFunction;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws InterruptedException, IOException {
+ BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+ possibleCreateTable(options, context.element().getKey());
+ context.output(context.element());
+ }
+
+ private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination)
+ throws InterruptedException, IOException {
+ String tableSpec = tableDestination.getTableSpec();
+ TableReference tableReference = tableDestination.getTableReference();
+ String tableDescription = tableDestination.getTableDescription();
+ if (createDisposition != createDisposition.CREATE_NEVER
+ && !createdTables.contains(tableSpec)) {
+ synchronized (createdTables) {
+ // Another thread may have succeeded in creating the table in the meanwhile, so
+ // check again. This check isn't needed for correctness, but we add it to prevent
+ // every thread from attempting a create and overwhelming our BigQuery quota.
+ DatasetService datasetService = bqServices.getDatasetService(options);
+ if (!createdTables.contains(tableSpec)) {
+ TableSchema tableSchema = schemaFunction.apply(tableDestination);
+ if (datasetService.getTable(tableReference) == null) {
+ datasetService.createTable(
+ new Table()
+ .setTableReference(tableReference)
+ .setSchema(tableSchema)
+ .setDescription(tableDescription));
+ }
+ createdTables.add(tableSpec);
+ }
+ }
+ }
+ }
+
+ static void clearCreatedTables() {
+ synchronized (createdTables) {
+ createdTables.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
new file mode 100644
index 0000000..da3a70a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Given a write to a specific table, assign that to one of the
+ * {@link GenerateShardedTable#numShards} keys assigned to that table.
+ */
+class GenerateShardedTable extends DoFn<KV<TableDestination, TableRow>,
+ KV<ShardedKey<String>, TableRow>> {
+ private final int numShards;
+
+ GenerateShardedTable(int numShards) {
+ this.numShards = numShards;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+ ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
+ // We output on keys 0-50 to ensure that there's enough batching for
+ // BigQuery.
+ String tableSpec = context.element().getKey().getTableSpec();
+ context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)),
+ context.element().getValue()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index 0c08e18..7712417 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -1,3 +1,20 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
@@ -6,8 +23,6 @@ import com.google.common.base.Strings;
import java.io.IOException;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
@@ -15,44 +30,38 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
/**
- * Prepare an input {@link PCollection<T>} for writing to BigQuery. Use the table-reference
+ * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference
* function to determine which tables each element is written to, and format the element into a
* {@link TableRow} using the user-supplied format function.
*/
-public class PrepareWrite<T> extends PTransform<PCollection<T>, PCollection<KV<String, TableRow>>> {
- private static final String NAME = "PrepareWrite";
- private SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
+public class PrepareWrite<T> extends DoFn<T, KV<TableDestination, TableRow>> {
+ private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
private SerializableFunction<T, TableRow> formatFunction;
- public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction,
+ public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
SerializableFunction<T, TableRow> formatFunction) {
- super(NAME);
- this.tableRefFunction = tableRefFunction;
+ this.tableFunction = tableFunction;
this.formatFunction = formatFunction;
}
- @Override
- public PCollection<KV<String, TableRow>> expand(PCollection<T> input) {
- PCollection<KV<String, TableRow>> elementsByTable =
- input.apply(ParDo.of(new DoFn<T, KV<String, TableRow>>() {
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
- String tableSpec = tableSpecFromWindowedValue(
- context.getPipelineOptions().as(BigQueryOptions.class),
- ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
- TableRow tableRow = formatFunction.apply(context.element());
- context.output(KV.of(tableSpec, tableRow));
- }
- }));
- return elementsByTable;
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+ TableDestination tableDestination = tableSpecFromWindowedValue(
+ context.getPipelineOptions().as(BigQueryOptions.class),
+ ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
+ TableRow tableRow = formatFunction.apply(context.element());
+ context.output(KV.of(tableDestination, tableRow));
}
- private String tableSpecFromWindowedValue(BigQueryOptions options,
+ private TableDestination tableSpecFromWindowedValue(BigQueryOptions options,
ValueInSingleWindow<T> value) {
- TableReference table = tableRefFunction.apply(value);
- if (Strings.isNullOrEmpty(table.getProjectId())) {
- table.setProjectId(options.getProject());
+ TableDestination tableDestination = tableFunction.apply(value);
+ TableReference tableReference = tableDestination.getTableReference();
+ if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+ tableReference.setProjectId(options.getProject());
+ tableDestination = new TableDestination(tableReference,
+ tableDestination.getTableDescription());
}
- return BigQueryHelpers.toTableSpec(table);
+ return tableDestination;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
deleted file mode 100644
index 506a564..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
-* PTransform that performs streaming BigQuery write. To increase consistency,
-* it leverages BigQuery best effort de-dup mechanism.
- */
-class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
- private final Write<T> write;
-
- /** Constructor. */
- StreamWithDeDup(Write<T> write) {
- this.write = write;
- }
-
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
-
- @Override
- public WriteResult expand(PCollection<T> input) {
- // A naive implementation would be to simply stream data directly to BigQuery.
- // However, this could occasionally lead to duplicated data, e.g., when
- // a VM that runs this code is restarted and the code is re-run.
-
- // The above risk is mitigated in this implementation by relying on
- // BigQuery built-in best effort de-dup mechanism.
-
- // To use this mechanism, each input TableRow is tagged with a generated
- // unique id, which is then passed to BigQuery and used to ignore duplicates.
-
- PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
- input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
- input.getPipeline().getOptions().as(BigQueryOptions.class), write)));
-
- // To prevent having the same TableRow processed more than once with regenerated
- // different unique ids, this implementation relies on "checkpointing", which is
- // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
- // performed by Reshuffle.
- NestedValueProvider<TableSchema, String> schema =
- write.getJsonSchema() == null
- ? null
- : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
- tagged
- .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
- .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(
- ParDo.of(
- new StreamingWriteFn(
- schema,
- write.getCreateDisposition(),
- write.getTableDescription(),
- write.getBigQueryServices())));
-
- return WriteResult.in(input.getPipeline());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
new file mode 100644
index 0000000..37afbdf
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+* PTransform that performs streaming BigQuery write. To increase consistency,
+* it leverages BigQuery best effort de-dup mechanism.
+ */
+
+class StreamingInserts
+ extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+ private final Write<?> write;
+
+ private static class ConstantSchemaFunction implements
+ SerializableFunction<TableDestination, TableSchema> {
+ private final @Nullable String jsonSchema;
+
+ ConstantSchemaFunction(TableSchema schema) {
+ this.jsonSchema = BigQueryHelpers.toJsonString(schema);
+ }
+
+ @Override
+ @Nullable
+ public TableSchema apply(TableDestination table) {
+ return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+ }
+ }
+
+ /** Constructor. */
+ StreamingInserts(Write<?> write) {
+ this.write = write;
+ }
+
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+
+ @Override
+ public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+ // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
+ // schema function here. If no schema is specified, this function will return null.
+ SerializableFunction<TableDestination, TableSchema> schemaFunction =
+ new ConstantSchemaFunction(write.getSchema());
+
+ // A naive implementation would be to simply stream data directly to BigQuery.
+ // However, this could occasionally lead to duplicated data, e.g., when
+ // a VM that runs this code is restarted and the code is re-run.
+
+ // The above risk is mitigated in this implementation by relying on
+ // BigQuery built-in best effort de-dup mechanism.
+
+ // To use this mechanism, each input TableRow is tagged with a generated
+ // unique id, which is then passed to BigQuery and used to ignore duplicates.
+ PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input
+ .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(),
+ write.getBigQueryServices(), schemaFunction)))
+ // We create 50 keys per BigQuery table to generate output on. This is few enough that we
+ // get good batching into BigQuery's insert calls, and enough that we can max out the
+ // streaming insert quota.
+ .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50)))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
+ .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
+
+ // To prevent having the same TableRow processed more than once with regenerated
+ // different unique ids, this implementation relies on "checkpointing", which is
+ // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+ // performed by Reshuffle.
+ tagged
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+ .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+ .apply("StreamingWrite",
+ ParDo.of(
+ new StreamingWriteFn(write.getBigQueryServices())));
+
+ return WriteResult.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 1d93fa3..83ed3d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -18,28 +18,16 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -52,48 +40,19 @@ import org.apache.beam.sdk.values.KV;
@VisibleForTesting
class StreamingWriteFn
extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
- /** TableSchema in JSON. Use String to make the class Serializable. */
- @Nullable
- private final ValueProvider<String> jsonTableSchema;
-
- @Nullable private final String tableDescription;
-
private final BigQueryServices bqServices;
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
private transient Map<String, List<TableRow>> tableRows;
- private final Write.CreateDisposition createDisposition;
-
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;
- /** The list of tables created so far, so we don't try the creation
- each time. */
- private static Set<String> createdTables =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-
/** Tracks bytes written, exposed as "ByteCount" Counter. */
private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount");
- /** Constructor. */
- StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
- Write.CreateDisposition createDisposition,
- @Nullable String tableDescription, BigQueryServices bqServices) {
- this.jsonTableSchema = schema == null ? null :
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
- this.createDisposition = createDisposition;
- this.bqServices = checkNotNull(bqServices, "bqServices");
- this.tableDescription = tableDescription;
- }
-
- /**
- * Clear the cached map of created tables. Used for testing.
- */
- static void clearCreatedTables() {
- synchronized (createdTables) {
- createdTables.clear();
- }
+ StreamingWriteFn(BigQueryServices bqServices) {
+ this.bqServices = bqServices;
}
/** Prepares a target BigQuery table. */
@@ -119,9 +78,8 @@ class StreamingWriteFn
@FinishBundle
public void finishBundle(Context context) throws Exception {
BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-
for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
- TableReference tableReference = getOrCreateTable(options, entry.getKey());
+ TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
flushRows(tableReference, entry.getValue(),
uniqueIdsForTableRows.get(entry.getKey()), options);
}
@@ -132,39 +90,6 @@ class StreamingWriteFn
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
- .withLabel("Table Schema"))
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
- }
-
- public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
- throws InterruptedException, IOException {
- TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
- if (createDisposition != createDisposition.CREATE_NEVER
- && !createdTables.contains(tableSpec)) {
- synchronized (createdTables) {
- // Another thread may have succeeded in creating the table in the meanwhile, so
- // check again. This check isn't needed for correctness, but we add it to prevent
- // every thread from attempting a create and overwhelming our BigQuery quota.
- DatasetService datasetService = bqServices.getDatasetService(options);
- if (!createdTables.contains(tableSpec)) {
- if (datasetService.getTable(tableReference) == null) {
- TableSchema tableSchema = BigQueryIO.JSON_FACTORY.fromString(
- jsonTableSchema.get(), TableSchema.class);
- datasetService.createTable(
- new Table()
- .setTableReference(tableReference)
- .setSchema(tableSchema)
- .setDescription(tableDescription));
- }
- createdTables.add(tableSpec);
- }
- }
- }
- return tableReference;
}
/**
@@ -173,6 +98,7 @@ class StreamingWriteFn
private void flushRows(TableReference tableReference,
List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
throws InterruptedException {
+ System.out.println("FlUSHING ROWS " + tableRows.size());
if (!tableRows.isEmpty()) {
try {
long totalBytes = bqServices.getDatasetService(options).insertAll(
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 3cbbf3b..631afeb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -1,7 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.api.services.bigquery.model.TableReference;
+
/**
- * Created by relax on 3/28/17.
+ * Encapsulates a BigQuery table destination.
*/
public class TableDestination {
+ private final String tableSpec;
+ private final String tableDescription;
+
+
+ public TableDestination(String tableSpec, String tableDescription) {
+ this.tableSpec = tableSpec;
+ this.tableDescription = tableDescription;
+ }
+
+ public TableDestination(TableReference tableReference, String tableDescription) {
+ this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
+ this.tableDescription = tableDescription;
+ }
+
+ public String getTableSpec() {
+ return tableSpec;
+ }
+
+
+ public TableReference getTableReference() {
+ return BigQueryHelpers.parseTableSpec(tableSpec);
+ }
+
+ public String getTableDescription() {
+ return tableDescription;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
new file mode 100644
index 0000000..fa24700
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A coder for {@link TableDestination} objects.
+ */
+public class TableDestinationCoder extends AtomicCoder<TableDestination> {
+ private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
+
+
+ @JsonCreator
+ public static TableDestinationCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(TableDestination value, OutputStream outStream, Context context)
+ throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null value");
+ }
+ stringCoder.encode(value.getTableSpec(), outStream, context.nested());
+ stringCoder.encode(value.getTableDescription(), outStream, context);
+ }
+
+ @Override
+ public TableDestination decode(InputStream inStream, Context context) throws IOException {
+ return new TableDestination(
+ stringCoder.decode(inStream, context.nested()),
+ stringCoder.decode(inStream, context.nested()));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ return;
+ }
+
+ StringUtf8Coder stringCoder = StringUtf8Coder.of();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
new file mode 100644
index 0000000..6f0186e
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * Fn that tags each table row with a unique id and destination table.
+ * To avoid calling UUID.randomUUID() for each element, which can be costly,
+ * a randomUUID is generated only once per bucket of data. The actual unique
+ * id is created by concatenating this randomUUID with a sequential number.
+ */
+@VisibleForTesting
+class TagWithUniqueIds
+ extends DoFn<KV<ShardedKey<String>, TableRow>, KV<ShardedKey<String>, TableRowInfo>> {
+
+ private transient String randomUUID;
+ private transient long sequenceNo = 0L;
+
+ @StartBundle
+ public void startBundle(Context context) {
+ randomUUID = UUID.randomUUID().toString();
+ }
+
+ /** Tag the input with a unique id. */
+ @ProcessElement
+ public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+ String uniqueId = randomUUID + sequenceNo++;
+ // We output on keys 0-50 to ensure that there's enough batching for
+ // BigQuery.
+ context.output(KV.of(context.element().getKey(),
+ new TableRowInfo(context.element().getValue(), uniqueId)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
deleted file mode 100644
index 4e50f7c..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
-
-/**
- * Fn that tags each table row with a unique id and destination table.
- * To avoid calling UUID.randomUUID() for each element, which can be costly,
- * a randomUUID is generated only once per bucket of data. The actual unique
- * id is created by concatenating this randomUUID with a sequential number.
- */
-@VisibleForTesting
-class TagWithUniqueIdsAndTable<T>
- extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
- /** TableSpec to write to in the case of a single static destination. */
- private ValueProvider<String> tableSpec = null;
-
- private final Write<T, ?> write;
-
- private transient String randomUUID;
- private transient long sequenceNo = 0L;
-
- TagWithUniqueIdsAndTable(BigQueryOptions options,
- Write<T, ?> write) {
- ValueProvider<TableReference> table = write.getTableWithDefaultProject(
- options.as(BigQueryOptions.class));
- if (table != null) {
- this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
- }
- this.write = write;
- }
-
-
- @StartBundle
- public void startBundle(Context context) {
- randomUUID = UUID.randomUUID().toString();
- }
-
- /** Tag the input with a unique id. */
- @ProcessElement
- public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
- String uniqueId = randomUUID + sequenceNo++;
- ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
- String tableSpec = tableSpecFromWindowedValue(
- context.getPipelineOptions().as(BigQueryOptions.class),
- ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
- // We output on keys 0-50 to ensure that there's enough batching for
- // BigQuery.
- context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
- new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId)));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.addIfNotNull(DisplayData.item("table", tableSpec));
- builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass())
- .withLabel("Table Reference Function"));
- }
-
- @VisibleForTesting
- ValueProvider<String> getTableSpec() {
- return tableSpec;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 499aa74..d953edd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -518,7 +518,6 @@ public class BigQueryIOTest implements Serializable {
/** A fake dataset service that can be serialized, for use in testReadFromTable. */
private static class FakeDatasetService implements DatasetService, Serializable {
-
@Override
public Table getTable(TableReference tableRef)
throws InterruptedException, IOException {
@@ -630,6 +629,7 @@ public class BigQueryIOTest implements Serializable {
TableContainer tableContainer = getTableContainer(
ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
for (int i = 0; i < rowList.size(); ++i) {
+ System.out.println("adding row " + rowList.get(i));
tableContainer.addRow(rowList.get(i), insertIdList.get(i));
dataSize += rowList.get(i).toString().length();
}
@@ -1121,15 +1121,15 @@ public class BigQueryIOTest implements Serializable {
}
);
- SerializableFunction<ValueInSingleWindow<Integer>, String> tableFunction =
- new SerializableFunction<ValueInSingleWindow<Integer>, String>() {
+ SerializableFunction<ValueInSingleWindow<Integer>, TableDestination> tableFunction =
+ new SerializableFunction<ValueInSingleWindow<Integer>, TableDestination>() {
@Override
- public String apply(ValueInSingleWindow<Integer> input) {
+ public TableDestination apply(ValueInSingleWindow<Integer> input) {
PartitionedGlobalWindow window = (PartitionedGlobalWindow) input.getWindow();
// Check that we can access the element as well here.
checkArgument(window.value.equals(Integer.toString(input.getValue() % 5)),
"Incorrect element");
- return "project-id:dataset-id.table-id-" + window.value;
+ return new TableDestination("project-id:dataset-id.table-id-" + window.value, "");
}
};
@@ -1559,14 +1559,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testStreamingWriteFnCreateNever() throws Exception {
- StreamingWriteFn fn = new StreamingWriteFn(
- null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices());
- assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"),
- fn.getOrCreateTable(null, "dataset.table"));
- }
-
- @Test
public void testCreateNeverWithStreaming() throws Exception {
BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
options.setProject("project");