You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/17 18:15:54 UTC

[1/2] beam git commit: Combine jobName with stepUUID for BQIO

Repository: beam
Updated Branches:
  refs/heads/master a91571ef9 -> f9b5d55e5


Combine jobName with stepUUID for BQIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a4d2a5de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a4d2a5de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a4d2a5de

Branch: refs/heads/master
Commit: a4d2a5ded77803fa1243c86d2008b05865c464f2
Parents: a91571e
Author: Sam McVeety <sg...@google.com>
Authored: Mon Dec 26 20:59:08 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 17 10:06:30 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 111 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  58 +++++++++-
 2 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 4b19973..701374d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -379,6 +379,51 @@ public class BigQueryIO {
     }
   }
 
+  @VisibleForTesting
+  static class BeamJobUuidToBigQueryJobUuid
+      implements SerializableFunction<String, String> {
+    @Override
+    public String apply(String from) {
+      return "beam_job_" + from;
+    }
+  }
+
+  @VisibleForTesting
+  static class CreatePerBeamJobUuid
+      implements SerializableFunction<String, String> {
+    private final String stepUuid;
+
+    private CreatePerBeamJobUuid(String stepUuid) {
+      this.stepUuid = stepUuid;
+    }
+
+    @Override
+    public String apply(String jobUuid) {
+      return stepUuid + "_" + jobUuid.replaceAll("-", "");
+    }
+  }
+
+  @VisibleForTesting
+  static class CreateJsonTableRefFromUuid
+      implements SerializableFunction<String, TableReference> {
+    private final String executingProject;
+
+    private CreateJsonTableRefFromUuid(String executingProject) {
+      this.executingProject = executingProject;
+    }
+
+    @Override
+    public TableReference apply(String jobUuid) {
+      String queryTempDatasetId = "temp_dataset_" + jobUuid;
+      String queryTempTableId = "temp_table_" + jobUuid;
+      TableReference queryTempTableRef = new TableReference()
+          .setProjectId(executingProject)
+          .setDatasetId(queryTempDatasetId)
+          .setTableId(queryTempTableId);
+      return queryTempTableRef;
+    }
+  }
+
   @Nullable
   private static ValueProvider<String> displayTable(
       @Nullable ValueProvider<TableReference> table) {
@@ -471,6 +516,9 @@ public class BigQueryIO {
       @Nullable final Boolean useLegacySql;
       @Nullable BigQueryServices bigQueryServices;
 
+      @VisibleForTesting @Nullable String stepUuid;
+      @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
       private static final String QUERY_VALIDATION_FAILURE_ERROR =
           "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
           + " pipeline, This validation can be disabled using #withoutValidation.";
@@ -667,10 +715,12 @@ public class BigQueryIO {
 
       @Override
       public PCollection<TableRow> expand(PBegin input) {
-        String uuid = randomUUIDString();
-        final String jobIdToken = "beam_job_" + uuid;
-
+        stepUuid = randomUUIDString();
         BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+        jobUuid = NestedValueProvider.of(
+           StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+        final ValueProvider<String> jobIdToken = NestedValueProvider.of(
+            jobUuid, new BeamJobUuidToBigQueryJobUuid());
 
         BoundedSource<TableRow> source;
         final BigQueryServices bqServices = getBigQueryServices();
@@ -679,7 +729,7 @@ public class BigQueryIO {
         String tempLocation = bqOptions.getTempLocation();
         try {
           IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          extractDestinationDir = factory.resolve(tempLocation, uuid);
+          extractDestinationDir = factory.resolve(tempLocation, stepUuid);
         } catch (IOException e) {
           throw new RuntimeException(
               String.format("Failed to resolve extract destination directory in %s", tempLocation));
@@ -687,18 +737,9 @@ public class BigQueryIO {
 
         final String executingProject = bqOptions.getProject();
         if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
-          String queryTempDatasetId = "temp_dataset_" + uuid;
-          String queryTempTableId = "temp_table_" + uuid;
-
-          TableReference queryTempTableRef = new TableReference()
-              .setProjectId(executingProject)
-              .setDatasetId(queryTempDatasetId)
-              .setTableId(queryTempTableId);
-          String jsonTableRef = toJsonString(queryTempTableRef);
-
           source = BigQueryQuerySource.create(
               jobIdToken, query, NestedValueProvider.of(
-                  StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()),
+                jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
               flattenResults, useLegacySql, extractDestinationDir, bqServices);
         } else {
           ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
@@ -913,7 +954,7 @@ public class BigQueryIO {
   static class BigQueryTableSource extends BigQuerySourceBase {
 
     static BigQueryTableSource create(
-        String jobIdToken,
+        ValueProvider<String> jobIdToken,
         ValueProvider<TableReference> table,
         String extractDestinationDir,
         BigQueryServices bqServices,
@@ -926,7 +967,7 @@ public class BigQueryIO {
     private final AtomicReference<Long> tableSizeBytes;
 
     private BigQueryTableSource(
-        String jobIdToken,
+        ValueProvider<String> jobIdToken,
         ValueProvider<TableReference> table,
         String extractDestinationDir,
         BigQueryServices bqServices,
@@ -982,7 +1023,7 @@ public class BigQueryIO {
   static class BigQueryQuerySource extends BigQuerySourceBase {
 
     static BigQueryQuerySource create(
-        String jobIdToken,
+        ValueProvider<String> jobIdToken,
         ValueProvider<String> query,
         ValueProvider<TableReference> queryTempTableRef,
         Boolean flattenResults,
@@ -1006,7 +1047,7 @@ public class BigQueryIO {
     private transient AtomicReference<JobStatistics> dryRunJobStats;
 
     private BigQueryQuerySource(
-        String jobIdToken,
+        ValueProvider<String> jobIdToken,
         ValueProvider<String> query,
         ValueProvider<TableReference> queryTempTableRef,
         Boolean flattenResults,
@@ -1063,7 +1104,7 @@ public class BigQueryIO {
           "Dataset for BigQuery query job temporary table");
 
       // 3. Execute the query.
-      String queryJobId = jobIdToken + "-query";
+      String queryJobId = jobIdToken.get() + "-query";
       executeQuery(
           executingProject.get(),
           queryJobId,
@@ -1161,13 +1202,13 @@ public class BigQueryIO {
     // The initial backoff for verifying temp files.
     private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
 
-    protected final String jobIdToken;
+    protected final ValueProvider<String> jobIdToken;
     protected final String extractDestinationDir;
     protected final BigQueryServices bqServices;
     protected final ValueProvider<String> executingProject;
 
     private BigQuerySourceBase(
-        String jobIdToken,
+        ValueProvider<String> jobIdToken,
         String extractDestinationDir,
         BigQueryServices bqServices,
         ValueProvider<String> executingProject) {
@@ -1396,8 +1437,8 @@ public class BigQueryIO {
     }
   }
 
-  private static String getExtractJobId(String jobIdToken) {
-    return jobIdToken + "-extract";
+  private static String getExtractJobId(ValueProvider<String> jobIdToken) {
+    return jobIdToken.get() + "-extract";
   }
 
   private static String getExtractDestinationUri(String extractDestinationDir) {
@@ -1644,6 +1685,9 @@ public class BigQueryIO {
 
       @Nullable private BigQueryServices bigQueryServices;
 
+      @VisibleForTesting @Nullable String stepUuid;
+      @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
       private static class TranslateTableSpecFunction implements
           SerializableFunction<BoundedWindow, TableReference> {
         private SerializableFunction<BoundedWindow, String> tableSpecFunction;
@@ -1924,14 +1968,19 @@ public class BigQueryIO {
 
         ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-        String jobIdToken = "beam_job_" + randomUUIDString();
+        stepUuid = randomUUIDString();
+        jobUuid = NestedValueProvider.of(
+           StaticValueProvider.of(options.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+        ValueProvider<String> jobIdToken = NestedValueProvider.of(
+            jobUuid, new BeamJobUuidToBigQueryJobUuid());
+
         String tempLocation = options.getTempLocation();
         String tempFilePrefix;
         try {
           IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
           tempFilePrefix = factory.resolve(
                   factory.resolve(tempLocation, "BigQueryWriteTemp"),
-                  jobIdToken);
+                  stepUuid);
         } catch (IOException e) {
           throw new RuntimeException(
               String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
@@ -2248,7 +2297,7 @@ public class BigQueryIO {
     static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
       private final boolean singlePartition;
       private final BigQueryServices bqServices;
-      private final String jobIdToken;
+      private final ValueProvider<String> jobIdToken;
       private final String tempFilePrefix;
       private final ValueProvider<String> jsonTableRef;
       private final ValueProvider<String> jsonSchema;
@@ -2258,7 +2307,7 @@ public class BigQueryIO {
       public WriteTables(
           boolean singlePartition,
           BigQueryServices bqServices,
-          String jobIdToken,
+          ValueProvider<String> jobIdToken,
           String tempFilePrefix,
           ValueProvider<String> jsonTableRef,
           ValueProvider<String> jsonSchema,
@@ -2277,7 +2326,7 @@ public class BigQueryIO {
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
-        String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
+        String jobIdPrefix = String.format(jobIdToken.get() + "_%05d", c.element().getKey());
         TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
         if (!singlePartition) {
           ref.setTableId(jobIdPrefix);
@@ -2383,7 +2432,7 @@ public class BigQueryIO {
      */
     static class WriteRename extends DoFn<String, Void> {
       private final BigQueryServices bqServices;
-      private final String jobIdToken;
+      private final ValueProvider<String> jobIdToken;
       private final ValueProvider<String> jsonTableRef;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
@@ -2391,7 +2440,7 @@ public class BigQueryIO {
 
       public WriteRename(
           BigQueryServices bqServices,
-          String jobIdToken,
+          ValueProvider<String> jobIdToken,
           ValueProvider<String> jsonTableRef,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition,
@@ -2419,7 +2468,7 @@ public class BigQueryIO {
         }
         copy(
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            jobIdToken,
+            jobIdToken.get(),
             fromJsonString(jsonTableRef.get(), TableReference.class),
             tempTables,
             writeDisposition,

http://git-wip-us.apache.org/repos/asf/beam/blob/a4d2a5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 471b5e4..3e8c2c9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -1646,7 +1647,8 @@ public class BigQueryIOTest implements Serializable {
     TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        jobIdToken, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices,
+        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
+        extractDestinationDir, fakeBqServices,
         StaticValueProvider.of("project"));
 
     List<TableRow> expected = ImmutableList.of(
@@ -1684,7 +1686,7 @@ public class BigQueryIOTest implements Serializable {
     TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        jobIdToken, StaticValueProvider.of(table),
+        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
         extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
 
     List<TableRow> expected = ImmutableList.of(
@@ -1750,7 +1752,8 @@ public class BigQueryIOTest implements Serializable {
     String extractDestinationDir = "mock://tempLocation";
     TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable),
+        StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+        StaticValueProvider.of(destinationTable),
         true /* flattenResults */, true /* useLegacySql */,
         extractDestinationDir, fakeBqServices);
 
@@ -1842,7 +1845,8 @@ public class BigQueryIOTest implements Serializable {
     String extractDestinationDir = "mock://tempLocation";
     TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        jobIdToken, StaticValueProvider.of("query"), StaticValueProvider.of(destinationTable),
+        StaticValueProvider.of(jobIdToken), StaticValueProvider.of("query"),
+        StaticValueProvider.of(destinationTable),
         true /* flattenResults */, true /* useLegacySql */,
         extractDestinationDir, fakeBqServices);
 
@@ -2117,7 +2121,7 @@ public class BigQueryIOTest implements Serializable {
     WriteTables writeTables = new WriteTables(
         false,
         fakeBqServices,
-        jobIdToken,
+        StaticValueProvider.of(jobIdToken),
         tempFilePrefix,
         StaticValueProvider.of(jsonTable),
         StaticValueProvider.of(jsonSchema),
@@ -2195,7 +2199,7 @@ public class BigQueryIOTest implements Serializable {
 
     WriteRename writeRename = new WriteRename(
         fakeBqServices,
-        jobIdToken,
+        StaticValueProvider.of(jobIdToken),
         StaticValueProvider.of(jsonTable),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED,
@@ -2358,4 +2362,46 @@ public class BigQueryIOTest implements Serializable {
   public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
     CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
   }
+
+  @Test
+  public void testUniqueStepIdRead() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    Pipeline pipeline = TestPipeline.create(options);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery(
+        options.getInputQuery()).withoutValidation();
+    pipeline.apply(read1);
+    BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery(
+        options.getInputQuery()).withoutValidation();
+    pipeline.apply(read2);
+    assertNotEquals(read1.stepUuid, read2.stepUuid);
+    assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
+  }
+
+  @Test
+  public void testUniqueStepIdWrite() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    Pipeline pipeline = TestPipeline.create(options);
+    BigQueryIO.Write.Bound write1 = BigQueryIO.Write
+        .to(options.getOutputTable())
+        .withSchema(NestedValueProvider.of(
+            options.getOutputSchema(), new JsonSchemaToTableSchema()))
+        .withoutValidation();
+    BigQueryIO.Write.Bound write2 = BigQueryIO.Write
+        .to(options.getOutputTable())
+        .withSchema(NestedValueProvider.of(
+            options.getOutputSchema(), new JsonSchemaToTableSchema()))
+        .withoutValidation();
+    pipeline
+        .apply(Create.<TableRow>of())
+        .apply(write1);
+    pipeline
+        .apply(Create.<TableRow>of())
+        .apply(write2);
+    assertNotEquals(write1.stepUuid, write2.stepUuid);
+    assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get());
+  }
 }


[2/2] beam git commit: This closes #1697

Posted by dh...@apache.org.
This closes #1697


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9b5d55e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9b5d55e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9b5d55e

Branch: refs/heads/master
Commit: f9b5d55e53e08ff8c7f73e493c69f23900c9687b
Parents: a91571e a4d2a5d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 17 10:07:00 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 17 10:07:00 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 111 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  58 +++++++++-
 2 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------