You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/17 18:15:54 UTC
[1/2] beam git commit: Combine jobName with stepUUID for BQIO
Repository: beam
Updated Branches:
refs/heads/master a91571ef9 -> f9b5d55e5
Combine jobName with stepUUID for BQIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a4d2a5de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a4d2a5de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a4d2a5de
Branch: refs/heads/master
Commit: a4d2a5ded77803fa1243c86d2008b05865c464f2
Parents: a91571e
Author: Sam McVeety <sg...@google.com>
Authored: Mon Dec 26 20:59:08 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 17 10:06:30 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 111 +++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 58 +++++++++-
2 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 4b19973..701374d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -379,6 +379,51 @@ public class BigQueryIO {
}
}
+ @VisibleForTesting
+ static class BeamJobUuidToBigQueryJobUuid
+ implements SerializableFunction<String, String> {
+ @Override
+ public String apply(String from) {
+ return "beam_job_" + from;
+ }
+ }
+
+ @VisibleForTesting
+ static class CreatePerBeamJobUuid
+ implements SerializableFunction<String, String> {
+ private final String stepUuid;
+
+ private CreatePerBeamJobUuid(String stepUuid) {
+ this.stepUuid = stepUuid;
+ }
+
+ @Override
+ public String apply(String jobUuid) {
+ return stepUuid + "_" + jobUuid.replaceAll("-", "");
+ }
+ }
+
+ @VisibleForTesting
+ static class CreateJsonTableRefFromUuid
+ implements SerializableFunction<String, TableReference> {
+ private final String executingProject;
+
+ private CreateJsonTableRefFromUuid(String executingProject) {
+ this.executingProject = executingProject;
+ }
+
+ @Override
+ public TableReference apply(String jobUuid) {
+ String queryTempDatasetId = "temp_dataset_" + jobUuid;
+ String queryTempTableId = "temp_table_" + jobUuid;
+ TableReference queryTempTableRef = new TableReference()
+ .setProjectId(executingProject)
+ .setDatasetId(queryTempDatasetId)
+ .setTableId(queryTempTableId);
+ return queryTempTableRef;
+ }
+ }
+
@Nullable
private static ValueProvider<String> displayTable(
@Nullable ValueProvider<TableReference> table) {
@@ -471,6 +516,9 @@ public class BigQueryIO {
@Nullable final Boolean useLegacySql;
@Nullable BigQueryServices bigQueryServices;
+ @VisibleForTesting @Nullable String stepUuid;
+ @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";
@@ -667,10 +715,12 @@ public class BigQueryIO {
@Override
public PCollection<TableRow> expand(PBegin input) {
- String uuid = randomUUIDString();
- final String jobIdToken = "beam_job_" + uuid;
-
+ stepUuid = randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ jobUuid = NestedValueProvider.of(
+ StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+ final ValueProvider<String> jobIdToken = NestedValueProvider.of(
+ jobUuid, new BeamJobUuidToBigQueryJobUuid());
BoundedSource<TableRow> source;
final BigQueryServices bqServices = getBigQueryServices();
@@ -679,7 +729,7 @@ public class BigQueryIO {
String tempLocation = bqOptions.getTempLocation();
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- extractDestinationDir = factory.resolve(tempLocation, uuid);
+ extractDestinationDir = factory.resolve(tempLocation, stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve extract destination directory in %s", tempLocation));
@@ -687,18 +737,9 @@ public class BigQueryIO {
final String executingProject = bqOptions.getProject();
if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
- String queryTempDatasetId = "temp_dataset_" + uuid;
- String queryTempTableId = "temp_table_" + uuid;
-
- TableReference queryTempTableRef = new TableReference()
- .setProjectId(executingProject)
- .setDatasetId(queryTempDatasetId)
- .setTableId(queryTempTableId);
- String jsonTableRef = toJsonString(queryTempTableRef);
-
source = BigQueryQuerySource.create(
jobIdToken, query, NestedValueProvider.of(
- StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()),
+ jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
flattenResults, useLegacySql, extractDestinationDir, bqServices);
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
@@ -913,7 +954,7 @@ public class BigQueryIO {
static class BigQueryTableSource extends BigQuerySourceBase {
static BigQueryTableSource create(
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
ValueProvider<TableReference> table,
String extractDestinationDir,
BigQueryServices bqServices,
@@ -926,7 +967,7 @@ public class BigQueryIO {
private final AtomicReference<Long> tableSizeBytes;
private BigQueryTableSource(
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
ValueProvider<TableReference> table,
String extractDestinationDir,
BigQueryServices bqServices,
@@ -982,7 +1023,7 @@ public class BigQueryIO {
static class BigQueryQuerySource extends BigQuerySourceBase {
static BigQueryQuerySource create(
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
ValueProvider<String> query,
ValueProvider<TableReference> queryTempTableRef,
Boolean flattenResults,
@@ -1006,7 +1047,7 @@ public class BigQueryIO {
private transient AtomicReference<JobStatistics> dryRunJobStats;
private BigQueryQuerySource(
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
ValueProvider<String> query,
ValueProvider<TableReference> queryTempTableRef,
Boolean flattenResults,
@@ -1063,7 +1104,7 @@ public class BigQueryIO {
"Dataset for BigQuery query job temporary table");
// 3. Execute the query.
- String queryJobId = jobIdToken + "-query";
+ String queryJobId = jobIdToken.get() + "-query";
executeQuery(
executingProject.get(),
queryJobId,
@@ -1161,13 +1202,13 @@ public class BigQueryIO {
// The initial backoff for verifying temp files.
private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
- protected final String jobIdToken;
+ protected final ValueProvider<String> jobIdToken;
protected final String extractDestinationDir;
protected final BigQueryServices bqServices;
protected final ValueProvider<String> executingProject;
private BigQuerySourceBase(
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
String extractDestinationDir,
BigQueryServices bqServices,
ValueProvider<String> executingProject) {
@@ -1396,8 +1437,8 @@ public class BigQueryIO {
}
}
- private static String getExtractJobId(String jobIdToken) {
- return jobIdToken + "-extract";
+ private static String getExtractJobId(ValueProvider<String> jobIdToken) {
+ return jobIdToken.get() + "-extract";
}
private static String getExtractDestinationUri(String extractDestinationDir) {
@@ -1644,6 +1685,9 @@ public class BigQueryIO {
@Nullable private BigQueryServices bigQueryServices;
+ @VisibleForTesting @Nullable String stepUuid;
+ @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
private static class TranslateTableSpecFunction implements
SerializableFunction<BoundedWindow, TableReference> {
private SerializableFunction<BoundedWindow, String> tableSpecFunction;
@@ -1924,14 +1968,19 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- String jobIdToken = "beam_job_" + randomUUIDString();
+ stepUuid = randomUUIDString();
+ jobUuid = NestedValueProvider.of(
+ StaticValueProvider.of(options.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+ ValueProvider<String> jobIdToken = NestedValueProvider.of(
+ jobUuid, new BeamJobUuidToBigQueryJobUuid());
+
String tempLocation = options.getTempLocation();
String tempFilePrefix;
try {
IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
tempFilePrefix = factory.resolve(
factory.resolve(tempLocation, "BigQueryWriteTemp"),
- jobIdToken);
+ stepUuid);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
@@ -2248,7 +2297,7 @@ public class BigQueryIO {
static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
private final boolean singlePartition;
private final BigQueryServices bqServices;
- private final String jobIdToken;
+ private final ValueProvider<String> jobIdToken;
private final String tempFilePrefix;
private final ValueProvider<String> jsonTableRef;
private final ValueProvider<String> jsonSchema;
@@ -2258,7 +2307,7 @@ public class BigQueryIO {
public WriteTables(
boolean singlePartition,
BigQueryServices bqServices,
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
String tempFilePrefix,
ValueProvider<String> jsonTableRef,
ValueProvider<String> jsonSchema,
@@ -2277,7 +2326,7 @@ public class BigQueryIO {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
- String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
+ String jobIdPrefix = String.format(jobIdToken.get() + "_%05d", c.element().getKey());
TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
if (!singlePartition) {
ref.setTableId(jobIdPrefix);
@@ -2383,7 +2432,7 @@ public class BigQueryIO {
*/
static class WriteRename extends DoFn<String, Void> {
private final BigQueryServices bqServices;
- private final String jobIdToken;
+ private final ValueProvider<String> jobIdToken;
private final ValueProvider<String> jsonTableRef;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
@@ -2391,7 +2440,7 @@ public class BigQueryIO {
public WriteRename(
BigQueryServices bqServices,
- String jobIdToken,
+ ValueProvider<String> jobIdToken,
ValueProvider<String> jsonTableRef,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
@@ -2419,7 +2468,7 @@ public class BigQueryIO {
}
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
- jobIdToken,
+ jobIdToken.get(),
fromJsonString(jsonTableRef.get(), TableReference.class),
tempTables,
writeDisposition,
http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 471b5e4..3e8c2c9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -1646,7 +1647,8 @@ public class BigQueryIOTest implements Serializable {
TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- jobIdToken, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices,
+ StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
+ extractDestinationDir, fakeBqServices,
StaticValueProvider.of("project"));
List<TableRow> expected = ImmutableList.of(
@@ -1684,7 +1686,7 @@ public class BigQueryIOTest implements Serializable {
TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- jobIdToken, StaticValueProvider.of(table),
+ StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
List<TableRow> expected = ImmutableList.of(
@@ -1750,7 +1752,8 @@ public class BigQueryIOTest implements Serializable {
String extractDestinationDir = "mock://tempLocation";
TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable),
+ StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+ StaticValueProvider.of(destinationTable),
true /* flattenResults */, true /* useLegacySql */,
extractDestinationDir, fakeBqServices);
@@ -1842,7 +1845,8 @@ public class BigQueryIOTest implements Serializable {
String extractDestinationDir = "mock://tempLocation";
TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable),
+ StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+ StaticValueProvider.of(destinationTable),
true /* flattenResults */, true /* useLegacySql */,
extractDestinationDir, fakeBqServices);
@@ -2117,7 +2121,7 @@ public class BigQueryIOTest implements Serializable {
WriteTables writeTables = new WriteTables(
false,
fakeBqServices,
- jobIdToken,
+ StaticValueProvider.of(jobIdToken),
tempFilePrefix,
StaticValueProvider.of(jsonTable),
StaticValueProvider.of(jsonSchema),
@@ -2195,7 +2199,7 @@ public class BigQueryIOTest implements Serializable {
WriteRename writeRename = new WriteRename(
fakeBqServices,
- jobIdToken,
+ StaticValueProvider.of(jobIdToken),
StaticValueProvider.of(jsonTable),
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
@@ -2358,4 +2362,46 @@ public class BigQueryIOTest implements Serializable {
public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
}
+
+ @Test
+ public void testUniqueStepIdRead() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ Pipeline pipeline = TestPipeline.create(options);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery(
+ options.getInputQuery()).withoutValidation();
+ pipeline.apply(read1);
+ BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery(
+ options.getInputQuery()).withoutValidation();
+ pipeline.apply(read2);
+ assertNotEquals(read1.stepUuid, read2.stepUuid);
+ assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
+ }
+
+ @Test
+ public void testUniqueStepIdWrite() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setTempLocation("gs://testbucket/testdir");
+ Pipeline pipeline = TestPipeline.create(options);
+ BigQueryIO.Write.Bound write1 = BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation();
+ BigQueryIO.Write.Bound write2 = BigQueryIO.Write
+ .to(options.getOutputTable())
+ .withSchema(NestedValueProvider.of(
+ options.getOutputSchema(), new JsonSchemaToTableSchema()))
+ .withoutValidation();
+ pipeline
+ .apply(Create.<TableRow>of())
+ .apply(write1);
+ pipeline
+ .apply(Create.<TableRow>of())
+ .apply(write2);
+ assertNotEquals(write1.stepUuid, write2.stepUuid);
+ assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get());
+ }
}
[2/2] beam git commit: This closes #1697
Posted by dh...@apache.org.
This closes #1697
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9b5d55e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9b5d55e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9b5d55e
Branch: refs/heads/master
Commit: f9b5d55e53e08ff8c7f73e493c69f23900c9687b
Parents: a91571e a4d2a5d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 17 10:07:00 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 17 10:07:00 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 111 +++++++++++++------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 58 +++++++++-
2 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------