You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/08/18 03:58:19 UTC
[1/3] beam git commit: Fix failing test.
Repository: beam
Updated Branches:
refs/heads/master 0aae7aa5f -> d03a1284c
Fix failing test.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ac94e7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ac94e7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ac94e7d
Branch: refs/heads/master
Commit: 9ac94e7d45b0c57ba16f48f129c595bbbf041c1f
Parents: 518c158
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Thu Aug 17 12:57:47 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:25:19 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9ac94e7d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 43a494e..0ece3ee 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1955,7 +1955,7 @@ public class BigQueryIOTest implements Serializable {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
for (int j = 0; j < numPartitions; ++j) {
- String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, -1);
+ String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, 0);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename = Paths.get(baseDir.toString(),
[2/3] beam git commit: Ensure that each triggered load generates a
different job id (for the case of streaming triggered file loads),
and add test coverage to catch this.
Posted by re...@apache.org.
Ensure that each triggered load generates a different job id (for the case of streaming triggered file loads), and add test coverage to catch this.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/518c158f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/518c158f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/518c158f
Branch: refs/heads/master
Commit: 518c158f82249091a54dca17ae348734f5abe633
Parents: 0aae7aa
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Aug 15 21:58:14 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:25:19 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 12 ++++++++----
.../apache/beam/sdk/io/gcp/bigquery/WriteRename.java | 3 ++-
.../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 ++--
.../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +-
.../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java | 9 +++++++++
5 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 09508e0..78dcdde 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -225,15 +225,19 @@ public class BigQueryHelpers {
}
// Create a unique job id for a table load.
- static String createJobId(String prefix, TableDestination tableDestination, int partition) {
+ static String createJobId(String prefix, TableDestination tableDestination, int partition,
+ long index) {
// Job ID must be different for each partition of each table.
String destinationHash =
Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
+ String jobId = String.format("%s_%s", prefix, destinationHash);
if (partition >= 0) {
- return String.format("%s_%s_%05d", prefix, destinationHash, partition);
- } else {
- return String.format("%s_%s", prefix, destinationHash);
+ jobId += String.format("_%05d", partition);
+ }
+ if (index >= 0) {
+ jobId += String.format("_%05d", index);
}
+ return jobId;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index eb1da5f..ff69476 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -101,7 +101,8 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
// Make sure each destination table gets a unique job id.
String jobIdPrefix =
- BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1);
+ BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1,
+ c.pane().getIndex());
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 24911a7..c8fab75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -119,8 +119,8 @@ class WriteTables<DestinationT>
Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
- String jobIdPrefix =
- BigQueryHelpers.createJobId(c.sideInput(jobIdToken), tableDestination, partition);
+ String jobIdPrefix = BigQueryHelpers.createJobId(
+ c.sideInput(jobIdToken), tableDestination, partition, c.pane().getIndex());
if (!singlePartition) {
tableReference.setTableId(jobIdPrefix);
http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3d53b7e..43a494e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1955,7 +1955,7 @@ public class BigQueryIOTest implements Serializable {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
for (int j = 0; j < numPartitions; ++j) {
- String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j);
+ String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j, -1);
List<String> filesPerPartition = Lists.newArrayList();
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename = Paths.get(baseDir.toString(),
http://git-wip-us.apache.org/repos/asf/beam/blob/518c158f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 2045bb7..7d5101d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -109,6 +109,7 @@ class FakeJobService implements JobService, Serializable {
public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
synchronized (allJobs) {
+ verifyUniqueJobId(jobRef.getJobId());
Job job = new Job();
job.setJobReference(jobRef);
job.setConfiguration(new JobConfiguration().setLoad(loadConfig));
@@ -141,6 +142,7 @@ class FakeJobService implements JobService, Serializable {
checkArgument(extractConfig.getDestinationFormat().equals("AVRO"),
"Only extract to AVRO is supported");
synchronized (allJobs) {
+ verifyUniqueJobId(jobRef.getJobId());
++numExtractJobCalls;
Job job = new Job();
@@ -175,6 +177,7 @@ class FakeJobService implements JobService, Serializable {
public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
throws IOException, InterruptedException {
synchronized (allJobs) {
+ verifyUniqueJobId(jobRef.getJobId());
Job job = new Job();
job.setJobReference(jobRef);
job.setConfiguration(new JobConfiguration().setCopy(copyConfig));
@@ -257,6 +260,12 @@ class FakeJobService implements JobService, Serializable {
}
}
+ private void verifyUniqueJobId(String jobId) throws IOException {
+ if (allJobs.containsColumn(jobId)) {
+ throw new IOException("Duplicate job id " + jobId);
+ }
+ }
+
private JobStatus runJob(Job job) throws InterruptedException, IOException {
if (job.getConfiguration().getLoad() != null) {
return runLoadJob(job.getJobReference(), job.getConfiguration().getLoad());
[3/3] beam git commit: This closes 3722
Posted by re...@apache.org.
This closes 3722
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d03a1284
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d03a1284
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d03a1284
Branch: refs/heads/master
Commit: d03a1284c51cbf39116f4cd91ebb405623cecfa2
Parents: 0aae7aa 9ac94e7
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Thu Aug 17 20:33:04 2017 -0700
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Aug 17 20:33:04 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 12 ++++++++----
.../apache/beam/sdk/io/gcp/bigquery/WriteRename.java | 3 ++-
.../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 ++--
.../apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +-
.../apache/beam/sdk/io/gcp/bigquery/FakeJobService.java | 9 +++++++++
5 files changed, 22 insertions(+), 8 deletions(-)
----------------------------------------------------------------------