You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 19:23:21 UTC
[1/2] beam git commit: Remove IOChannelUtil/Factory from BigQueryIO
Repository: beam
Updated Branches:
refs/heads/master 24c6ff44e -> 70e53e7dc
Remove IOChannelUtil/Factory from BigQueryIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40dc8443
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40dc8443
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40dc8443
Branch: refs/heads/master
Commit: 40dc844304a8432ee5c8e81b9cc806ef2ae3c1ea
Parents: 24c6ff4
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed May 3 22:41:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 12:23:14 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 -
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 17 +++-----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 35 ++++++++--------
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++----
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 43 +++++---------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 15 +++----
.../sdk/io/gcp/bigquery/FakeJobService.java | 29 ++++++++-----
7 files changed, 65 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 78d39b5..f422135 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -205,7 +205,6 @@ class BatchLoads<DestinationT>
bigQueryServices,
jobIdTokenView,
schemasView,
- stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
dynamicDestinations))
@@ -243,7 +242,6 @@ class BatchLoads<DestinationT>
bigQueryServices,
jobIdTokenView,
schemasView,
- stepUuid,
writeDisposition,
createDisposition,
dynamicDestinations))
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 6b4e518..318ea89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -36,12 +36,12 @@ import java.util.UUID;
import java.util.regex.Matcher;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
/** A set of helper functions and classes used by {@link BigQueryIO}. */
public class BigQueryHelpers {
@@ -309,14 +309,9 @@ public class BigQueryHelpers {
static String resolveTempLocation(
String tempLocationDir, String bigQueryOperationName, String stepUuid) {
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir);
- return factory.resolve(
- factory.resolve(tempLocationDir, bigQueryOperationName), stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve temp destination directory in %s",
- tempLocationDir), e);
- }
+ return FileSystems.matchNewResource(tempLocationDir, true)
+ .resolve(bigQueryOperationName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve(stepUuid, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+ .toString();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index c76ee86..0e36393 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -38,8 +38,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -50,6 +50,10 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
@@ -67,8 +71,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
@@ -520,19 +522,14 @@ public class BigQueryIO {
Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
- Collection<String> extractFiles = null;
if (extractJob != null) {
- extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
- } else {
- IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
- Collection<String> dirMatch = factory.match(extractDestinationDir);
- if (!dirMatch.isEmpty()) {
- extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+ List<ResourceId> extractFiles =
+ getExtractFilePaths(extractDestinationDir, extractJob);
+ if (extractFiles != null && !extractFiles.isEmpty()) {
+ FileSystems.delete(extractFiles,
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
- if (extractFiles != null && !extractFiles.isEmpty()) {
- IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles);
- }
}
};
return input.getPipeline()
@@ -583,7 +580,7 @@ public class BigQueryIO {
return String.format("%s/%s", extractDestinationDir, "*.avro");
}
- static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+ static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob)
throws IOException {
JobStatistics jobStats = extractJob.getStatistics();
List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
@@ -597,11 +594,13 @@ public class BigQueryIO {
}
long filesCount = counts.get(0);
- ImmutableList.Builder<String> paths = ImmutableList.builder();
- IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ ImmutableList.Builder<ResourceId> paths = ImmutableList.builder();
+ ResourceId extractDestinationDirResourceId =
+ FileSystems.matchNewResource(extractDestinationDir, true /* isDirectory */);
for (long i = 0; i < filesCount; ++i) {
- String filePath =
- factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
+ ResourceId filePath = extractDestinationDirResourceId.resolve(
+ String.format("%012d%s", i, ".avro"),
+ ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
paths.add(filePath);
}
return paths.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 49000d6..945c7d4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -90,7 +91,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
- List<String> tempFiles = executeExtract(
+ List<ResourceId> tempFiles = executeExtract(
extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);
TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
@@ -116,7 +117,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
return TableRowJsonCoder.of();
}
- private List<String> executeExtract(
+ private List<ResourceId> executeExtract(
String jobId, TableReference table, JobService jobService, String executingProject,
String extractDestinationDir)
throws InterruptedException, IOException {
@@ -143,12 +144,11 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
LOG.info("BigQuery extract job completed: {}", jobId);
- List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
- return ImmutableList.copyOf(tempFiles);
+ return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
}
private List<BoundedSource<TableRow>> createSources(
- List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+ List<ResourceId> files, TableSchema tableSchema) throws IOException, InterruptedException {
final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
SerializableFunction<GenericRecord, TableRow> function =
@@ -160,9 +160,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}};
List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
- for (String fileName : files) {
+ for (ResourceId file : files) {
avroSources.add(new TransformingSource<>(
- AvroSource.from(fileName), function, getDefaultOutputCoder()));
+ AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
}
return ImmutableList.copyOf(avroSources);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c480b42..e7dba2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -18,37 +18,30 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
-
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
@@ -74,7 +67,6 @@ class WriteTables<DestinationT>
private final BigQueryServices bqServices;
private final PCollectionView<String> jobIdToken;
private final PCollectionView<Map<DestinationT, String>> schemasView;
- private final String stepUuid;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
@@ -84,7 +76,6 @@ class WriteTables<DestinationT>
BigQueryServices bqServices,
PCollectionView<String> jobIdToken,
PCollectionView<Map<DestinationT, String>> schemasView,
- String stepUuid,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
DynamicDestinations<?, DestinationT> dynamicDestinations) {
@@ -92,7 +83,6 @@ class WriteTables<DestinationT>
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.schemasView = schemasView;
- this.stepUuid = stepUuid;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
@@ -114,8 +104,6 @@ class WriteTables<DestinationT>
tableReference, tableDestination.getTableDescription());
}
- String tempFilePrefix = resolveTempLocation(
- c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
Integer partition = c.element().getKey().getShardNumber();
List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
String jobIdPrefix =
@@ -137,7 +125,7 @@ class WriteTables<DestinationT>
tableDestination.getTableDescription());
c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));
- removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles);
+ removeTemporaryFiles(partitionFiles);
}
private void load(
@@ -198,22 +186,11 @@ class WriteTables<DestinationT>
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
}
- static void removeTemporaryFiles(
- PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
- if (factory instanceof GcsIOChannelFactory) {
- GcsUtil gcsUtil = new GcsUtilFactory().create(options);
- gcsUtil.remove(files);
- } else if (factory instanceof FileIOChannelFactory) {
- for (String filename : files) {
- LOG.debug("Removing file {}", filename);
- boolean exists = Files.deleteIfExists(Paths.get(filename));
- if (!exists) {
- LOG.debug("{} does not exist.", filename);
- }
- }
- } else {
- throw new IOException("Unrecognized file system.");
+ static void removeTemporaryFiles(Collection<String> files) throws IOException {
+ ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder();
+ for (String file: files) {
+ fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */));
}
+ FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 026afce..aabae3e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -83,7 +83,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -118,7 +120,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowedValue;
@@ -1820,7 +1821,9 @@ public class BigQueryIOTest implements Serializable {
for (int k = 0; k < numFilesPerPartition; ++k) {
String filename = Paths.get(baseDir.toString(),
String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
- try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) {
+ ResourceId fileResource =
+ FileSystems.matchNewResource(filename, false /* isDirectory */);
+ try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) {
try (OutputStream output = Channels.newOutputStream(channel)) {
TableRow tableRow = new TableRow().set("name", tableName);
TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
@@ -1858,7 +1861,6 @@ public class BigQueryIOTest implements Serializable {
fakeBqServices,
jobIdTokenView,
schemaMapView,
- stepUuid,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
new IdentityDynamicTables());
@@ -1904,14 +1906,9 @@ public class BigQueryIOTest implements Serializable {
File tempDir = new File(bqOptions.getTempLocation());
testNumFiles(tempDir, 10);
- WriteTables.removeTemporaryFiles(bqOptions, tempFilePrefix, fileNames);
+ WriteTables.removeTemporaryFiles(fileNames);
testNumFiles(tempDir, 0);
-
- for (String fileName : fileNames) {
- loggedWriteTables.verifyDebug("Removing file " + fileName);
- }
- loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not exist.");
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 13d345e..ee3af0b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -40,6 +40,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
@@ -61,12 +62,14 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.Transport;
import org.joda.time.Duration;
@@ -95,7 +98,7 @@ class FakeJobService implements JobService, Serializable {
HashBasedTable.create();
private static int numExtractJobCalls = 0;
- private static final com.google.common.collect.Table<String, String, List<String>>
+ private static final com.google.common.collect.Table<String, String, List<ResourceId>>
filesForLoadJobs = HashBasedTable.create();
private static final com.google.common.collect.Table<String, String, JobStatistics>
dryRunQueryResults = HashBasedTable.create();
@@ -117,12 +120,17 @@ class FakeJobService implements JobService, Serializable {
// Copy the files to a new location for import, as the temporary files will be deleted by
// the caller.
if (loadConfig.getSourceUris().size() > 0) {
- List<String> loadFiles = Lists.newArrayList();
+ ImmutableList.Builder<ResourceId> sourceFiles = ImmutableList.builder();
+ ImmutableList.Builder<ResourceId> loadFiles = ImmutableList.builder();
for (String filename : loadConfig.getSourceUris()) {
- loadFiles.add(filename + ThreadLocalRandom.current().nextInt());
+ sourceFiles.add(FileSystems.matchNewResource(filename, false /* isDirectory */));
+ loadFiles.add(FileSystems.matchNewResource(
+ filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */));
}
- IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles);
- filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles);
+
+ FileSystems.copy(sourceFiles.build(), loadFiles.build(),
+ MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+ filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build());
}
allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
@@ -286,7 +294,7 @@ class FakeJobService implements JobService, Serializable {
throws InterruptedException, IOException {
TableReference destination = load.getDestinationTable();
TableSchema schema = load.getSchema();
- List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
+ List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
@@ -298,8 +306,8 @@ class FakeJobService implements JobService, Serializable {
datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
List<TableRow> rows = Lists.newArrayList();
- for (String filename : sourceFiles) {
- rows.addAll(readRows(filename));
+ for (ResourceId filename : sourceFiles) {
+ rows.addAll(readRows(filename.toString()));
}
datasetService.insertAll(destination, rows, null);
return new JobStatus().setState("DONE");
@@ -385,7 +393,8 @@ class FakeJobService implements JobService, Serializable {
private void writeRowsHelper(List<TableRow> rows, Schema avroSchema,
String destinationPattern, int shard) throws IOException {
String filename = destinationPattern.replace("*", String.format("%012d", shard));
- try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY);
+ try (WritableByteChannel channel = FileSystems.create(
+ FileSystems.matchNewResource(filename, false /* isDirectory */), MimeTypes.BINARY);
DataFileWriter<GenericRecord> tableRowWriter =
new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema))
.create(avroSchema, Channels.newOutputStream(channel))) {
[2/2] beam git commit: This closes #2889
Posted by dh...@apache.org.
This closes #2889
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/70e53e7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/70e53e7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/70e53e7d
Branch: refs/heads/master
Commit: 70e53e7dc5d58e4d9f88c6d4f1cff036429429c1
Parents: 24c6ff4 40dc844
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 12:23:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 12:23:17 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 -
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 17 +++-----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 35 ++++++++--------
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++----
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 43 +++++---------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 15 +++----
.../sdk/io/gcp/bigquery/FakeJobService.java | 29 ++++++++-----
7 files changed, 65 insertions(+), 90 deletions(-)
----------------------------------------------------------------------