You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:17:51 UTC
[45/50] [abbrv] beam git commit: BigQueryIO: Remove tempLocation
usage at pipeline construction time
BigQueryIO: Remove tempLocation usage at pipeline construction time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0a2249f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0a2249f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0a2249f
Branch: refs/heads/DSL_SQL
Commit: d0a2249f17446156d9ce35b7d0b559b51e62b0b8
Parents: 1bc50d6
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed May 3 15:42:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 20:50:53 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 42 +++++++--------
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 15 ++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +++----
.../io/gcp/bigquery/BigQueryQuerySource.java | 5 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++--
.../io/gcp/bigquery/BigQueryTableSource.java | 6 +--
.../io/gcp/bigquery/WriteBundlesToFiles.java | 19 +++----
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 19 +++----
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 57 ++++----------------
9 files changed, 75 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 4e14696..78d39b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -19,11 +19,11 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -44,8 +45,6 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
@@ -106,26 +105,23 @@ class BatchLoads<DestinationT>
@Override
public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
- BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-
- validate(p.getOptions());
-
final String stepUuid = BigQueryHelpers.randomUUIDString();
- String tempLocation = options.getTempLocation();
- String tempFilePrefix;
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix =
- factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
- }
-
// Create a singleton job ID token at execution time. This will be used as the base for all
- // load jobs issued from this instance of the transfomr.
- PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+ // load jobs issued from this instance of the transform.
+ PCollection<String> singleton = p
+ .apply("Create", Create.of((Void) null))
+ .apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() {
+ @ProcessElement
+ public void getTempFilePrefix(ProcessContext c) {
+ c.output(
+ resolveTempLocation(
+ c.getPipelineOptions().getTempLocation(),
+ "BigQueryWriteTemp",
+ stepUuid));
+ }
+ }));
+
PCollectionView<String> jobIdTokenView =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
@@ -152,7 +148,7 @@ class BatchLoads<DestinationT>
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
inputInGlobalWindow
.apply("WriteBundlesToFiles", ParDo.of(
- new WriteBundlesToFiles<DestinationT>(tempFilePrefix)))
+ new WriteBundlesToFiles<DestinationT>(stepUuid)))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
@@ -209,7 +205,7 @@ class BatchLoads<DestinationT>
bigQueryServices,
jobIdTokenView,
schemasView,
- tempFilePrefix,
+ stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
dynamicDestinations))
@@ -247,7 +243,7 @@ class BatchLoads<DestinationT>
bigQueryServices,
jobIdTokenView,
schemasView,
- tempFilePrefix,
+ stepUuid,
writeDisposition,
createDisposition,
dynamicDestinations))
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 70e7a5f..6b4e518 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
/** A set of helper functions and classes used by {@link BigQueryIO}. */
public class BigQueryHelpers {
@@ -304,4 +306,17 @@ public class BigQueryHelpers {
.setTableId(queryTempTableId);
return queryTempTableRef;
}
+
+ static String resolveTempLocation(
+ String tempLocationDir, String bigQueryOperationName, String stepUuid) {
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir);
+ return factory.resolve(
+ factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve temp destination directory in %s",
+ tempLocationDir), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 29491d8..c76ee86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Job;
@@ -482,17 +483,7 @@ public class BigQueryIO {
@Override
public PCollection<TableRow> expand(PBegin input) {
final String stepUuid = BigQueryHelpers.randomUUIDString();
- BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
BoundedSource<TableRow> source;
- final String extractDestinationDir;
- String tempLocation = bqOptions.getTempLocation();
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- extractDestinationDir = factory.resolve(tempLocation, stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve extract destination directory in %s", tempLocation));
- }
if (getQuery() != null
&& (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
@@ -502,14 +493,12 @@ public class BigQueryIO {
getQuery(),
getFlattenResults(),
getUseLegacySql(),
- extractDestinationDir,
getBigQueryServices());
} else {
source =
BigQueryTableSource.create(
stepUuid,
getTableProvider(),
- extractDestinationDir,
getBigQueryServices());
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -517,6 +506,11 @@ public class BigQueryIO {
@Override
void cleanup(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ final String extractDestinationDir =
+ resolveTempLocation(
+ bqOptions.getTempLocation(),
+ "BigQueryExtractTemp",
+ stepUuid);
JobReference jobRef =
new JobReference()
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 205f9cc..710c934 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -53,14 +53,12 @@ class BigQueryQuerySource extends BigQuerySourceBase {
ValueProvider<String> query,
Boolean flattenResults,
Boolean useLegacySql,
- String extractDestinationDir,
BigQueryServices bqServices) {
return new BigQueryQuerySource(
stepUuid,
query,
flattenResults,
useLegacySql,
- extractDestinationDir,
bqServices);
}
@@ -74,9 +72,8 @@ class BigQueryQuerySource extends BigQuerySourceBase {
ValueProvider<String> query,
Boolean flattenResults,
Boolean useLegacySql,
- String extractDestinationDir,
BigQueryServices bqServices) {
- super(stepUuid, extractDestinationDir, bqServices);
+ super(stepUuid, bqServices);
this.query = checkNotNull(query, "query");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 0171046..41e298c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -64,14 +65,12 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
protected final String stepUuid;
- protected final String extractDestinationDir;
protected final BigQueryServices bqServices;
private transient List<BoundedSource<TableRow>> cachedSplitResult;
- BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) {
+ BigQuerySourceBase(String stepUuid, BigQueryServices bqServices) {
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
- this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
this.bqServices = checkNotNull(bqServices, "bqServices");
}
@@ -86,9 +85,13 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);
+
+ final String extractDestinationDir =
+ resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+
String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
List<String> tempFiles = executeExtract(
- extractJobId, tableToExtract, jobService, bqOptions.getProject());
+ extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);
TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
.getTable(tableToExtract).getSchema();
@@ -114,7 +117,8 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}
private List<String> executeExtract(
- String jobId, TableReference table, JobService jobService, String executingProject)
+ String jobId, TableReference table, JobService jobService, String executingProject,
+ String extractDestinationDir)
throws InterruptedException, IOException {
JobReference jobRef = new JobReference()
.setProjectId(executingProject)
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index e754bd2..1d45641 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -45,9 +45,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
static BigQueryTableSource create(
String stepUuid,
ValueProvider<TableReference> table,
- String extractDestinationDir,
BigQueryServices bqServices) {
- return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices);
+ return new BigQueryTableSource(stepUuid, table, bqServices);
}
private final ValueProvider<String> jsonTable;
@@ -56,9 +55,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
private BigQueryTableSource(
String stepUuid,
ValueProvider<TableReference> table,
- String extractDestinationDir,
BigQueryServices bqServices) {
- super(stepUuid, extractDestinationDir, bqServices);
+ super(stepUuid, bqServices);
this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
this.tableSizeBytes = new AtomicReference<>();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 4f609b2..e90b974 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
+
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.collect.Maps;
import java.io.IOException;
@@ -32,7 +34,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ class WriteBundlesToFiles<DestinationT>
// Map from tablespec to a writer for that table.
private transient Map<DestinationT, TableRowWriter> writers;
- private final String tempFilePrefix;
+ private final String stepUuid;
/**
* The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
@@ -104,8 +105,8 @@ class WriteBundlesToFiles<DestinationT>
public void verifyDeterministic() {}
}
- WriteBundlesToFiles(String tempFilePrefix) {
- this.tempFilePrefix = tempFilePrefix;
+ WriteBundlesToFiles(String stepUuid) {
+ this.stepUuid = stepUuid;
}
@StartBundle
@@ -117,6 +118,8 @@ class WriteBundlesToFiles<DestinationT>
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
+ String tempFilePrefix = resolveTempLocation(
+ c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
TableRowWriter writer = writers.get(c.element().getKey());
if (writer == null) {
writer = new TableRowWriter(tempFilePrefix);
@@ -147,12 +150,4 @@ class WriteBundlesToFiles<DestinationT>
}
writers.clear();
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.addIfNotNull(
- DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index b299244..c480b42 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
+
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
@@ -41,7 +43,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.FileIOChannelFactory;
import org.apache.beam.sdk.util.GcsIOChannelFactory;
import org.apache.beam.sdk.util.GcsUtil;
@@ -73,7 +74,7 @@ class WriteTables<DestinationT>
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
private final PCollectionView<Map<DestinationT, String>> schemasView;
- private final String tempFilePrefix;
+ private final String stepUuid;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
@@ -83,7 +84,7 @@ class WriteTables<DestinationT>
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
PCollectionView<Map<DestinationT, String>> schemasView,
- String tempFilePrefix,
+ String stepUuid,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
@@ -91,7 +92,7 @@ class WriteTables<DestinationT>
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.schemasView = schemasView;
- this.tempFilePrefix = tempFilePrefix;
+ this.stepUuid = stepUuid;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
@@ -113,6 +114,8 @@ class WriteTables<DestinationT>
tableReference, tableDestination.getTableDescription());
}
+ String tempFilePrefix = resolveTempLocation(
+ c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
String jobIdPrefix =
@@ -213,12 +216,4 @@ class WriteTables<DestinationT>
throw new IOException("Unrecognized file system.");
}
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.addIfNotNull(
- DisplayData.item("tempFilePrefix", tempFilePrefix).withLabel("Temporary File Prefix"));
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0a2249f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e267dab..026afce 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -91,7 +91,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperati
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -136,7 +135,6 @@ import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -946,7 +944,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigQueryIO.Read read = BigQueryIO.read()
@@ -962,7 +959,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigQueryIO.Read read = BigQueryIO.read()
@@ -988,40 +984,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
- public void testBatchWritePrimitiveDisplayData() throws IOException, InterruptedException {
- testWritePrimitiveDisplayData(/* streaming: */ false);
- }
-
- @Test
- @Ignore("[BEAM-436] DirectRunner tempLocation configuration insufficient")
- public void testStreamingWritePrimitiveDisplayData() throws IOException, InterruptedException {
- testWritePrimitiveDisplayData(/* streaming: */ true);
- }
-
- private void testWritePrimitiveDisplayData(boolean streaming) throws IOException,
- InterruptedException {
- PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(streaming);
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
-
- BigQueryIO.Write write = BigQueryIO.writeTableRows()
- .to("project:dataset.table")
- .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
- .withTestServices(new FakeBigQueryServices()
- .withDatasetService(new FakeDatasetService())
- .withJobService(new FakeJobService()))
- .withoutValidation();
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
- displayData, hasItem(hasDisplayItem("tableSpec")));
-
- assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
- displayData, hasItem(hasDisplayItem("schema")));
- }
-
- @Test
public void testBuildWriteWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
@@ -1360,9 +1322,10 @@ public class BigQueryIOTest implements Serializable {
Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices);
+ stepUuid, StaticValueProvider.of(table), fakeBqServices);
PipelineOptions options = PipelineOptionsFactory.create();
+ options.setTempLocation(baseDir.toString());
Assert.assertThat(
SourceTestUtils.readFromSource(bqSource, options),
CoreMatchers.is(expected));
@@ -1399,9 +1362,8 @@ public class BigQueryIOTest implements Serializable {
Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
String stepUuid = "testStepUuid";
- String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices);
+ stepUuid, StaticValueProvider.of(table), fakeBqServices);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(baseDir.toString());
@@ -1479,12 +1441,10 @@ public class BigQueryIOTest implements Serializable {
String query = FakeBigQueryServices.encodeQuery(expected);
- String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
stepUuid, StaticValueProvider.of(query),
- true /* flattenResults */, true /* useLegacySql */,
- extractDestinationDir, fakeBqServices);
- options.setTempLocation(extractDestinationDir);
+ true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
+ options.setTempLocation(baseDir.toString());
TableReference queryTable = new TableReference()
.setProjectId(bqOptions.getProject())
@@ -1571,7 +1531,7 @@ public class BigQueryIOTest implements Serializable {
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
stepUuid,
StaticValueProvider.of(query),
- true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
+ true /* flattenResults */, true /* useLegacySql */, fakeBqServices);
options.setTempLocation(baseDir.toString());
@@ -1845,7 +1805,7 @@ public class BigQueryIOTest implements Serializable {
long numPartitions = 3;
long numFilesPerPartition = 10;
String jobIdToken = "jobIdToken";
- String tempFilePrefix = "tempFilePrefix";
+ String stepUuid = "stepUuid";
Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
@@ -1898,7 +1858,7 @@ public class BigQueryIOTest implements Serializable {
fakeBqServices,
jobIdTokenView,
schemaMapView,
- tempFilePrefix,
+ stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
new IdentityDynamicTables());
@@ -1907,6 +1867,7 @@ public class BigQueryIOTest implements Serializable {
KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of());
+ tester.getPipelineOptions().setTempLocation("tempLocation");
for (KV<ShardedKey<String>, List<String>> partition : partitions) {
tester.processElement(partition);
}