You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 19:23:21 UTC

[1/2] beam git commit: Remove IOChannelUtil/Factory from BigQueryIO

Repository: beam
Updated Branches:
  refs/heads/master 24c6ff44e -> 70e53e7dc


Remove IOChannelUtil/Factory from BigQueryIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40dc8443
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40dc8443
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40dc8443

Branch: refs/heads/master
Commit: 40dc844304a8432ee5c8e81b9cc806ef2ae3c1ea
Parents: 24c6ff4
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed May 3 22:41:06 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 12:23:14 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  2 -
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 17 +++-----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 35 ++++++++--------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++----
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   | 43 +++++---------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 15 +++----
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 29 ++++++++-----
 7 files changed, 65 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 78d39b5..f422135 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -205,7 +205,6 @@ class BatchLoads<DestinationT>
                             bigQueryServices,
                             jobIdTokenView,
                             schemasView,
-                            stepUuid,
                             WriteDisposition.WRITE_EMPTY,
                             CreateDisposition.CREATE_IF_NEEDED,
                             dynamicDestinations))
@@ -243,7 +242,6 @@ class BatchLoads<DestinationT>
                         bigQueryServices,
                         jobIdTokenView,
                         schemasView,
-                        stepUuid,
                         writeDisposition,
                         createDisposition,
                         dynamicDestinations))

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 6b4e518..318ea89 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -36,12 +36,12 @@ import java.util.UUID;
 import java.util.regex.Matcher;
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 
 /** A set of helper functions and classes used by {@link BigQueryIO}. */
 public class BigQueryHelpers {
@@ -309,14 +309,9 @@ public class BigQueryHelpers {
 
   static String resolveTempLocation(
       String tempLocationDir, String bigQueryOperationName, String stepUuid) {
-    try {
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocationDir);
-      return factory.resolve(
-          factory.resolve(tempLocationDir, bigQueryOperationName),  stepUuid);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Failed to resolve temp destination directory in %s",
-              tempLocationDir), e);
-    }
+    return FileSystems.matchNewResource(tempLocationDir, true)
+        .resolve(bigQueryOperationName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+        .resolve(stepUuid, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
+        .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index c76ee86..0e36393 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -38,8 +38,8 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -50,6 +50,10 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
@@ -67,8 +71,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
@@ -520,19 +522,14 @@ public class BigQueryIO {
 
               Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
 
-              Collection<String> extractFiles = null;
               if (extractJob != null) {
-                extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
-              } else {
-                IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
-                Collection<String> dirMatch = factory.match(extractDestinationDir);
-                if (!dirMatch.isEmpty()) {
-                  extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+                List<ResourceId> extractFiles =
+                    getExtractFilePaths(extractDestinationDir, extractJob);
+                if (extractFiles != null && !extractFiles.isEmpty()) {
+                  FileSystems.delete(extractFiles,
+                      MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
                 }
               }
-              if (extractFiles != null && !extractFiles.isEmpty()) {
-                IOChannelUtils.getFactory(extractFiles.iterator().next()).remove(extractFiles);
-              }
             }
           };
       return input.getPipeline()
@@ -583,7 +580,7 @@ public class BigQueryIO {
     return String.format("%s/%s", extractDestinationDir, "*.avro");
   }
 
-  static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+  static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob)
       throws IOException {
     JobStatistics jobStats = extractJob.getStatistics();
     List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
@@ -597,11 +594,13 @@ public class BigQueryIO {
     }
     long filesCount = counts.get(0);
 
-    ImmutableList.Builder<String> paths = ImmutableList.builder();
-    IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+    ImmutableList.Builder<ResourceId> paths = ImmutableList.builder();
+    ResourceId extractDestinationDirResourceId =
+        FileSystems.matchNewResource(extractDestinationDir, true /* isDirectory */);
     for (long i = 0; i < filesCount; ++i) {
-      String filePath =
-          factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
+      ResourceId filePath = extractDestinationDirResourceId.resolve(
+          String.format("%012d%s", i, ".avro"),
+          ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
       paths.add(filePath);
     }
     return paths.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 49000d6..945c7d4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -38,6 +38,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -90,7 +91,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
           resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
 
       String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
-      List<String> tempFiles = executeExtract(
+      List<ResourceId> tempFiles = executeExtract(
           extractJobId, tableToExtract, jobService, bqOptions.getProject(), extractDestinationDir);
 
       TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
@@ -116,7 +117,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
     return TableRowJsonCoder.of();
   }
 
-  private List<String> executeExtract(
+  private List<ResourceId> executeExtract(
       String jobId, TableReference table, JobService jobService, String executingProject,
       String extractDestinationDir)
           throws InterruptedException, IOException {
@@ -143,12 +144,11 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
 
     LOG.info("BigQuery extract job completed: {}", jobId);
 
-    List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
-    return ImmutableList.copyOf(tempFiles);
+    return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
   }
 
   private List<BoundedSource<TableRow>> createSources(
-      List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+      List<ResourceId> files, TableSchema tableSchema) throws IOException, InterruptedException {
     final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
 
     SerializableFunction<GenericRecord, TableRow> function =
@@ -160,9 +160,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
           }};
 
     List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
-    for (String fileName : files) {
+    for (ResourceId file : files) {
       avroSources.add(new TransformingSource<>(
-          AvroSource.from(fileName), function, getDefaultOutputCoder()));
+          AvroSource.from(file.toString()), function, getDefaultOutputCoder()));
     }
     return ImmutableList.copyOf(avroSources);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c480b42..e7dba2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -18,37 +18,30 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
-
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.slf4j.Logger;
@@ -74,7 +67,6 @@ class WriteTables<DestinationT>
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
   private final PCollectionView<Map<DestinationT, String>> schemasView;
-  private final String stepUuid;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
@@ -84,7 +76,6 @@ class WriteTables<DestinationT>
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       PCollectionView<Map<DestinationT, String>> schemasView,
-      String stepUuid,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
       DynamicDestinations<?, DestinationT> dynamicDestinations) {
@@ -92,7 +83,6 @@ class WriteTables<DestinationT>
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.schemasView = schemasView;
-    this.stepUuid = stepUuid;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
@@ -114,8 +104,6 @@ class WriteTables<DestinationT>
           tableReference, tableDestination.getTableDescription());
     }
 
-    String tempFilePrefix = resolveTempLocation(
-        c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
     Integer partition = c.element().getKey().getShardNumber();
     List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
     String jobIdPrefix =
@@ -137,7 +125,7 @@ class WriteTables<DestinationT>
         tableDestination.getTableDescription());
     c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(tableReference)));
 
-    removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles);
+    removeTemporaryFiles(partitionFiles);
   }
 
   private void load(
@@ -198,22 +186,11 @@ class WriteTables<DestinationT>
             BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
   }
 
-  static void removeTemporaryFiles(
-      PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
-    if (factory instanceof GcsIOChannelFactory) {
-      GcsUtil gcsUtil = new GcsUtilFactory().create(options);
-      gcsUtil.remove(files);
-    } else if (factory instanceof FileIOChannelFactory) {
-      for (String filename : files) {
-        LOG.debug("Removing file {}", filename);
-        boolean exists = Files.deleteIfExists(Paths.get(filename));
-        if (!exists) {
-          LOG.debug("{} does not exist.", filename);
-        }
-      }
-    } else {
-      throw new IOException("Unrecognized file system.");
+  static void removeTemporaryFiles(Collection<String> files) throws IOException {
+    ImmutableList.Builder<ResourceId> fileResources = ImmutableList.builder();
+    for (String file: files) {
+      fileResources.add(FileSystems.matchNewResource(file, false/* isDirectory */));
     }
+    FileSystems.delete(fileResources.build(), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 026afce..aabae3e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -83,7 +83,9 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -118,7 +120,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -1820,7 +1821,9 @@ public class BigQueryIOTest implements Serializable {
         for (int k = 0; k < numFilesPerPartition; ++k) {
           String filename = Paths.get(baseDir.toString(),
               String.format("files0x%08x_%05d", tempTableId.hashCode(), k)).toString();
-          try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.TEXT)) {
+          ResourceId fileResource =
+              FileSystems.matchNewResource(filename, false /* isDirectory */);
+          try (WritableByteChannel channel = FileSystems.create(fileResource, MimeTypes.TEXT)) {
             try (OutputStream output = Channels.newOutputStream(channel)) {
               TableRow tableRow = new TableRow().set("name", tableName);
               TableRowJsonCoder.of().encode(tableRow, output, Context.OUTER);
@@ -1858,7 +1861,6 @@ public class BigQueryIOTest implements Serializable {
             fakeBqServices,
             jobIdTokenView,
             schemaMapView,
-            stepUuid,
             WriteDisposition.WRITE_EMPTY,
             CreateDisposition.CREATE_IF_NEEDED,
             new IdentityDynamicTables());
@@ -1904,14 +1906,9 @@ public class BigQueryIOTest implements Serializable {
     File tempDir = new File(bqOptions.getTempLocation());
     testNumFiles(tempDir, 10);
 
-    WriteTables.removeTemporaryFiles(bqOptions, tempFilePrefix, fileNames);
+    WriteTables.removeTemporaryFiles(fileNames);
 
     testNumFiles(tempDir, 0);
-
-    for (String fileName : fileNames) {
-      loggedWriteTables.verifyDebug("Removing file " + fileName);
-    }
-    loggedWriteTables.verifyDebug(fileNames.get(numFiles) + " does not exist.");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/40dc8443/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 13d345e..ee3af0b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -40,6 +40,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import java.io.BufferedReader;
@@ -61,12 +62,14 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.FluentBackoff;
 
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.Transport;
 import org.joda.time.Duration;
@@ -95,7 +98,7 @@ class FakeJobService implements JobService, Serializable {
       HashBasedTable.create();
   private static int numExtractJobCalls = 0;
 
-  private static final com.google.common.collect.Table<String, String, List<String>>
+  private static final com.google.common.collect.Table<String, String, List<ResourceId>>
       filesForLoadJobs = HashBasedTable.create();
   private static final com.google.common.collect.Table<String, String, JobStatistics>
       dryRunQueryResults = HashBasedTable.create();
@@ -117,12 +120,17 @@ class FakeJobService implements JobService, Serializable {
       // Copy the files to a new location for import, as the temporary files will be deleted by
       // the caller.
       if (loadConfig.getSourceUris().size() > 0) {
-        List<String> loadFiles = Lists.newArrayList();
+        ImmutableList.Builder<ResourceId> sourceFiles = ImmutableList.builder();
+        ImmutableList.Builder<ResourceId> loadFiles = ImmutableList.builder();
         for (String filename : loadConfig.getSourceUris()) {
-          loadFiles.add(filename + ThreadLocalRandom.current().nextInt());
+          sourceFiles.add(FileSystems.matchNewResource(filename, false /* isDirectory */));
+          loadFiles.add(FileSystems.matchNewResource(
+              filename + ThreadLocalRandom.current().nextInt(), false /* isDirectory */));
         }
-        IOChannelUtils.getFactory(loadFiles.get(0)).copy(loadConfig.getSourceUris(), loadFiles);
-        filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles);
+
+        FileSystems.copy(sourceFiles.build(), loadFiles.build(),
+            MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+        filesForLoadJobs.put(jobRef.getProjectId(), jobRef.getJobId(), loadFiles.build());
       }
 
       allJobs.put(jobRef.getProjectId(), jobRef.getJobId(), new JobInfo(job));
@@ -286,7 +294,7 @@ class FakeJobService implements JobService, Serializable {
       throws InterruptedException, IOException {
     TableReference destination = load.getDestinationTable();
     TableSchema schema = load.getSchema();
-    List<String> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
+    List<ResourceId> sourceFiles = filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
     WriteDisposition writeDisposition = WriteDisposition.valueOf(load.getWriteDisposition());
     CreateDisposition createDisposition = CreateDisposition.valueOf(load.getCreateDisposition());
     checkArgument(load.getSourceFormat().equals("NEWLINE_DELIMITED_JSON"));
@@ -298,8 +306,8 @@ class FakeJobService implements JobService, Serializable {
     datasetService.createTable(new Table().setTableReference(destination).setSchema(schema));
 
     List<TableRow> rows = Lists.newArrayList();
-    for (String filename : sourceFiles) {
-      rows.addAll(readRows(filename));
+    for (ResourceId filename : sourceFiles) {
+      rows.addAll(readRows(filename.toString()));
     }
     datasetService.insertAll(destination, rows, null);
     return new JobStatus().setState("DONE");
@@ -385,7 +393,8 @@ class FakeJobService implements JobService, Serializable {
   private void writeRowsHelper(List<TableRow> rows, Schema avroSchema,
                                String destinationPattern, int shard) throws IOException {
     String filename = destinationPattern.replace("*", String.format("%012d", shard));
-    try (WritableByteChannel channel = IOChannelUtils.create(filename, MimeTypes.BINARY);
+    try (WritableByteChannel channel = FileSystems.create(
+        FileSystems.matchNewResource(filename, false /* isDirectory */), MimeTypes.BINARY);
          DataFileWriter<GenericRecord> tableRowWriter =
              new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(avroSchema))
                  .create(avroSchema, Channels.newOutputStream(channel))) {


[2/2] beam git commit: This closes #2889

Posted by dh...@apache.org.
This closes #2889


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/70e53e7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/70e53e7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/70e53e7d

Branch: refs/heads/master
Commit: 70e53e7dc5d58e4d9f88c6d4f1cff036429429c1
Parents: 24c6ff4 40dc844
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 4 12:23:17 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu May 4 12:23:17 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  2 -
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 17 +++-----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 35 ++++++++--------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 14 +++----
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   | 43 +++++---------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 15 +++----
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 29 ++++++++-----
 7 files changed, 65 insertions(+), 90 deletions(-)
----------------------------------------------------------------------