You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/08/18 03:58:19 UTC

[1/3] beam git commit: Fix failing test.

Repository: beam
Updated Branches:
  refs/heads/master 0aae7aa5f -> d03a1284c


Fix failing test.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ac94e7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ac94e7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ac94e7d

Branch: refs/heads/master
Commit: 9ac94e7d45b0c57ba16f48f129c595bbbf041c1f
Parents: 518c158
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Thu Aug 17 12:57:47 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:25:19 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9ac94e7d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 43a494e..0ece3ee 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1955,7 +1955,7 @@ public class BigQueryIOTest implements Serializable {
       String tableName = String.format("project-id:dataset-id.table%05d", i);
       TableDestination tableDestination = new TableDestination(tableName, tableName);
       for (int j = 0; j < numPartitions; ++j) {
-        String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, -1);
+        String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, 0);
         List<String> filesPerPartition = Lists.newArrayList();
         for (int k = 0; k < numFilesPerPartition; ++k) {
           String filename = Paths.get(baseDir.toString(),


[2/3] beam git commit: Ensure that each triggered load generates a different job id (for the case of streaming triggered file loads), and add test coverage to catch this.

Posted by re...@apache.org.
Ensure that each triggered load generates a different job id (for the case of streaming triggered file loads), and add test coverage to catch this.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/518c158f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/518c158f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/518c158f

Branch: refs/heads/master
Commit: 518c158f82249091a54dca17ae348734f5abe633
Parents: 0aae7aa
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Aug 15 21:58:14 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:25:19 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java       | 12 ++++++++----
 .../apache/beam/sdk/io/gcp/bigquery/WriteRename.java    |  3 ++-
 .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java    |  4 ++--
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java |  2 +-
 .../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java |  9 +++++++++
 5 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 09508e0..78dcdde 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -225,15 +225,19 @@ public class BigQueryHelpers {
   }
 
   // Create a unique job id for a table load.
-  static String createJobId(String prefix, TableDestination tableDestination, int partition) {
+  static String createJobId(String prefix, TableDestination tableDestination, int partition,
+      long index) {
     // Job ID must be different for each partition of each table.
     String destinationHash =
         Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
+    String jobId = String.format("%s_%s", prefix, destinationHash);
     if (partition >= 0) {
-      return String.format("%s_%s_%05d", prefix, destinationHash, partition);
-    } else {
-      return String.format("%s_%s", prefix, destinationHash);
+      jobId += String.format("_%05d", partition);
+    }
+    if (index >= 0) {
+      jobId += String.format("_%05d", index);
     }
+    return jobId;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index eb1da5f..ff69476 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -101,7 +101,8 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
 
     // Make sure each destination table gets a unique job id.
     String jobIdPrefix =
-        BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1);
+        BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1,
+    c.pane().getIndex());
 
     copy(
         bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),

http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 24911a7..c8fab75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -119,8 +119,8 @@ class WriteTables<DestinationT>
 
     Integer partition = c.element().getKey().getShardNumber();
     List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
-    String jobIdPrefix =
-        BigQueryHelpers.createJobId(c.sideInput(jobIdToken), tableDestination, partition);
+    String jobIdPrefix = BigQueryHelpers.createJobId(
+            c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex());
 
     if (!singlePartition) {
       tableReference.setTableId(jobIdPrefix);

http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3d53b7e..43a494e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1955,7 +1955,7 @@ public class BigQueryIOTest implements Serializable {
       String tableName = String.format("project-id:dataset-id.table%05d", i);
       TableDestination tableDestination = new TableDestination(tableName, tableName);
       for (int j = 0; j < numPartitions; ++j) {
-        String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j);
+        String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, -1);
         List<String> filesPerPartition = Lists.newArrayList();
         for (int k = 0; k < numFilesPerPartition; ++k) {
           String filename = Paths.get(baseDir.toString(),

http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 2045bb7..7d5101d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -109,6 +109,7 @@ class FakeJobService implements JobService, Serializable {
   public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
       throws InterruptedException, IOException {
     synchronized (allJobs) {
+      verifyUniqueJobId(jobRef.getJobId());
       Job job = new Job();
       job.setJobReference(jobRef);
       job.setConfiguration(new JobConfiguration().setLoad(loadConfig));
@@ -141,6 +142,7 @@ class FakeJobService implements JobService, Serializable {
     checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
         "Only extract to AVRO is supported");
     synchronized (allJobs) {
+      verifyUniqueJobId(jobRef.getJobId());
       ++numExtractJobCalls;
 
       Job job = new Job();
@@ -175,6 +177,7 @@ class FakeJobService implements JobService, Serializable {
   public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
       throws IOException, InterruptedException {
     synchronized (allJobs) {
+      verifyUniqueJobId(jobRef.getJobId());
       Job job = new Job();
       job.setJobReference(jobRef);
       job.setConfiguration(new JobConfiguration().setCopy(copyConfig));
@@ -257,6 +260,12 @@ class FakeJobService implements JobService, Serializable {
     }
   }
 
+  private void verifyUniqueJobId(String jobId) throws IOException {
+    if (allJobs.containsColumn(jobId)) {
+      throw new IOException("Duplicate job id " + jobId);
+    }
+  }
+
   private JobStatus runJob(Job job) throws InterruptedException, IOException {
     if (job.getConfiguration().getLoad() != null) {
       return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad());


[3/3] beam git commit: This closes 3722

Posted by re...@apache.org.
This closes 3722


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d03a1284
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d03a1284
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d03a1284

Branch: refs/heads/master
Commit: d03a1284c51cbf39116f4cd91ebb405623cecfa2
Parents: 0aae7aa 9ac94e7
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Thu Aug 17 20:33:04 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:33:04 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java       | 12 ++++++++----
 .../apache/beam/sdk/io/gcp/bigquery/WriteRename.java    |  3 ++-
 .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java    |  4 ++--
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java |  2 +-
 .../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java |  9 +++++++++
 5 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------