You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:17:14 UTC
[08/50] [abbrv] beam git commit: Remove IoChannelUtils from
PackageUtil
Remove IoChannelUtils from PackageUtil
* Staging location as a new directory
* Add GcsCreateOptions to override the default upload buffer size value
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7d6ddc2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7d6ddc2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7d6ddc2
Branch: refs/heads/DSL_SQL
Commit: a7d6ddc2a669392fd808a24e31f7cd45742eaa43
Parents: 3b61f6a
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri Apr 28 18:07:31 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 08:09:13 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/util/GcsStager.java | 26 ++--
.../beam/runners/dataflow/util/PackageUtil.java | 38 +++--
.../runners/dataflow/DataflowRunnerTest.java | 20 ++-
.../runners/dataflow/util/PackageUtilTest.java | 143 ++++++++++++-------
.../org/apache/beam/sdk/io/FileSystems.java | 28 ++--
.../gcp/storage/GcsCreateOptions.java | 56 ++++++++
.../extensions/gcp/storage/GcsFileSystem.java | 7 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 12 +-
8 files changed, 224 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 53822e3..d18e306 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -22,14 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.storage.Storage;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.MimeTypes;
/**
* Utility class for staging files to GCS.
@@ -49,22 +47,24 @@ public class GcsStager implements Stager {
@Override
public List<DataflowPackage> stageFiles() {
checkNotNull(options.getStagingLocation());
- List<String> filesToStage = options.getFilesToStage();
String windmillBinary =
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
if (windmillBinary != null) {
- filesToStage.add("windmill_main=" + windmillBinary);
+ options.getFilesToStage().add("windmill_main=" + windmillBinary);
}
+
int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
- Storage.Builder storageBuilder = Transport.newStorageClient(options);
- GcsUtil util = GcsUtilFactory.create(
- storageBuilder.build(),
- storageBuilder.getHttpRequestInitializer(),
- options.getExecutorService(),
- uploadSizeBytes);
+
+ GcsCreateOptions createOptions = GcsCreateOptions.builder()
+ .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+ .setMimeType(MimeTypes.BINARY)
+ .build();
+
return PackageUtil.stageClasspathElements(
- options.getFilesToStage(), options.getStagingLocation(), util);
+ options.getFilesToStage(),
+ options.getStagingLocation(),
+ createOptions);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 0d52c5d..5ddcd29 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -51,14 +51,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.ZipFiles;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +107,8 @@ class PackageUtil {
// Create the DataflowPackage with staging name and location.
String uniqueName = getUniqueContentName(source, hash);
- String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
+ String resourcePath = FileSystems.matchNewResource(stagingPath, true)
+ .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString();
DataflowPackage target = new DataflowPackage();
target.setName(overridePackageName != null ? overridePackageName : uniqueName);
target.setLocation(resourcePath);
@@ -181,14 +179,9 @@ class PackageUtil {
}
}
- private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
+ private static WritableByteChannel makeWriter(String target, CreateOptions createOptions)
throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(target);
- if (factory instanceof GcsIOChannelFactory) {
- return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
- } else {
- return factory.create(target, MimeTypes.BINARY);
- }
+ return FileSystems.create(FileSystems.matchNewResource(target, false), createOptions);
}
/**
@@ -197,7 +190,7 @@ class PackageUtil {
*/
private static void stageOnePackage(
PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached,
- Sleeper retrySleeper, GcsUtil gcsUtil) {
+ Sleeper retrySleeper, CreateOptions createOptions) {
String source = attributes.getSourcePath();
String target = attributes.getDataflowPackage().getLocation();
@@ -205,7 +198,7 @@ class PackageUtil {
// always using MimeTypes.BINARY?
try {
try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
+ long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes();
if (remoteLength == attributes.getSize()) {
LOG.debug("Skipping classpath element already staged: {} at {}",
attributes.getSourcePath(), target);
@@ -221,7 +214,7 @@ class PackageUtil {
while (true) {
try {
LOG.debug("Uploading classpath element {} to {}", source, target);
- try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
+ try (WritableByteChannel writer = makeWriter(target, createOptions)) {
copyContent(source, writer);
}
numUploaded.incrementAndGet();
@@ -262,12 +255,12 @@ class PackageUtil {
* @return A list of cloud workflow packages, each representing a classpath element.
*/
static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) {
+ Collection<String> classpathElements, String stagingPath, CreateOptions createOptions) {
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
try {
- return stageClasspathElements(
- classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil);
+ return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT,
+ executorService, createOptions);
} finally {
executorService.shutdown();
}
@@ -276,7 +269,8 @@ class PackageUtil {
// Visible for testing.
static List<DataflowPackage> stageClasspathElements(
Collection<String> classpathElements, final String stagingPath,
- final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) {
+ final Sleeper retrySleeper, ListeningExecutorService executorService,
+ final CreateOptions createOptions) {
LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
+ "prepare for execution.", classpathElements.size());
@@ -314,7 +308,7 @@ class PackageUtil {
futures.add(executorService.submit(new Runnable() {
@Override
public void run() {
- stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil);
+ stageOnePackage(attributes, numUploaded, numCached, retrySleeper, createOptions);
}
}));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d011994..fa106ac 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -33,6 +33,8 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
@@ -46,6 +48,7 @@ import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -449,8 +452,7 @@ public class DataflowRunnerTest {
@Test
public void testRunWithFiles() throws IOException {
- // Test that the function DataflowRunner.stageFiles works as
- // expected.
+ // Test that the function DataflowRunner.stageFiles works as expected.
final String cloudDataflowDataset = "somedataset";
// Create some temporary files.
@@ -461,6 +463,10 @@ public class DataflowRunnerTest {
String overridePackageName = "alias.txt";
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(GcsUtil.StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
+
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setFilesToStage(ImmutableList.of(
temp1.getAbsolutePath(),
@@ -475,6 +481,16 @@ public class DataflowRunnerTest {
options.setGcsUtil(mockGcsUtil);
options.setGcpCredential(new TestCredential());
+ when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+ .then(new Answer<SeekableByteChannel>() {
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+
Pipeline p = buildDataflowPipeline(options);
DataflowPipelineJob job = (DataflowPipelineJob) p.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 877832c..4ae3a77 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@ import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -58,6 +60,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.math.BigInteger;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.channels.Pipe.SinkChannel;
@@ -70,12 +73,17 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.testing.RegexMatcher;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.hamcrest.Matchers;
import org.junit.Before;
@@ -104,6 +112,7 @@ public class PackageUtilTest {
// 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+ private CreateOptions createOptions;
@Before
public void setUp() {
@@ -111,8 +120,8 @@ public class PackageUtilTest {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcsUtil(mockGcsUtil);
-
- IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+ FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
}
private File makeFileWithContents(String name, String contents) throws Exception {
@@ -122,7 +131,8 @@ public class PackageUtilTest {
return tmpFile;
}
- static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString();
+ static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", "base/path/");
+ static final String STAGING_PATH = STAGING_GCS_PATH.toString();
private static PackageAttributes makePackageAttributes(File file, String overridePackageName) {
return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName);
}
@@ -135,7 +145,7 @@ public class PackageUtilTest {
DataflowPackage target = attr.getDataflowPackage();
assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
- assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
assertThat(attr.getSize(), equalTo((long) contents.length()));
}
@@ -145,7 +155,7 @@ public class PackageUtilTest {
DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage();
assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
- assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
}
@Test
@@ -154,7 +164,7 @@ public class PackageUtilTest {
DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage();
assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
- assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
}
@Test
@@ -203,8 +213,10 @@ public class PackageUtilTest {
@Test
public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception {
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
- // all files will be present and cached so no upload needed.
- when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))).thenReturn(
+ ImmutableList.of(StorageObjectOrIOException.create(
+ createStorageObject(STAGING_PATH, tmpFile.length())))
+ );
List<String> classpathElements = Lists.newLinkedList();
for (int i = 0; i < 1005; ++i) {
@@ -212,8 +224,7 @@ public class PackageUtilTest {
classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
}
- PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil);
-
+ PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions);
logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow");
}
@@ -222,20 +233,22 @@ public class PackageUtilTest {
Pipe pipe = Pipe.open();
String contents = "This is a test!";
File tmpFile = makeFileWithContents("file.txt", contents);
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
+
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt"));
- assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(),
equalTo(contents));
}
@@ -244,8 +257,10 @@ public class PackageUtilTest {
public void testStagingPreservesClasspath() throws Exception {
File smallFile = makeFileWithContents("small.txt", "small");
File largeFile = makeFileWithContents("large.txt", "large contents");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
+
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenAnswer(new Answer<SinkChannel>() {
@Override
@@ -256,7 +271,7 @@ public class PackageUtilTest {
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()),
- STAGING_PATH, mockGcsUtil);
+ STAGING_PATH, createOptions);
// Verify that the packages are returned small, then large, matching input order even though
// the large file would be uploaded first.
assertThat(targets.get(0).getName(), startsWith("small"));
@@ -272,14 +287,15 @@ public class PackageUtilTest {
makeFileWithContents("folder/file.txt", "This is a test!");
makeFileWithContents("folder/directory/file.txt", "This is also a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
@@ -299,28 +315,30 @@ public class PackageUtilTest {
Pipe pipe = Pipe.open();
File tmpDirectory = tmpFolder.newFolder("folder");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar"));
- assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName()));
+ assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
}
@Test(expected = RuntimeException.class)
public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenThrow(new IOException("Fake Exception: Upload error"));
@@ -328,9 +346,9 @@ public class PackageUtilTest {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
- mockGcsUtil);
+ createOptions);
} finally {
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -339,8 +357,9 @@ public class PackageUtilTest {
@Test
public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception {
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH,
googleJsonResponseException(
@@ -350,7 +369,7 @@ public class PackageUtilTest {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()),
STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(),
- mockGcsUtil);
+ createOptions);
fail("Expected RuntimeException");
} catch (RuntimeException e) {
assertThat("Expected RuntimeException wrapping IOException.",
@@ -364,7 +383,7 @@ public class PackageUtilTest {
"Stale credentials can be resolved by executing 'gcloud auth application-default "
+ "login'")));
} finally {
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -374,8 +393,9 @@ public class PackageUtilTest {
public void testPackageUploadEventuallySucceeds() throws Exception {
Pipe pipe = Pipe.open();
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
.thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails
.thenReturn(pipe.sink()); // second attempt succeeds
@@ -383,9 +403,9 @@ public class PackageUtilTest {
try {
PackageUtil.stageClasspathElements(
ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper,
- MoreExecutors.newDirectExecutorService(), mockGcsUtil);
+ MoreExecutors.newDirectExecutorService(), createOptions);
} finally {
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -394,12 +414,14 @@ public class PackageUtilTest {
@Test
public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception {
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ createStorageObject(STAGING_PATH, tmpFile.length()))));
- PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH,
+ createOptions);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -411,13 +433,15 @@ public class PackageUtilTest {
tmpFolder.newFolder("folder", "directory");
makeFileWithContents("folder/file.txt", "This is a test!");
makeFileWithContents("folder/directory/file.txt", "This is also a test!");
- when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
PackageUtil.stageClasspathElements(
- ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil);
+ ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
}
@@ -428,30 +452,31 @@ public class PackageUtilTest {
File tmpFile = makeFileWithContents("file.txt", "This is a test!");
final String overriddenName = "alias.txt";
- when(mockGcsUtil.fileSize(any(GcsPath.class)))
- .thenThrow(new FileNotFoundException("some/path"));
+ when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+ .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+ new FileNotFoundException("some/path"))));
when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink());
List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH,
- mockGcsUtil);
+ createOptions);
DataflowPackage target = Iterables.getOnlyElement(targets);
- verify(mockGcsUtil).fileSize(any(GcsPath.class));
+ verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
verify(mockGcsUtil).create(any(GcsPath.class), anyString());
verifyNoMoreInteractions(mockGcsUtil);
assertThat(target.getName(), equalTo(overriddenName));
assertThat(target.getLocation(),
- RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+ RegexMatcher.matches(STAGING_PATH + "file-" + HASH_PATTERN + ".txt"));
}
@Test
public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception {
- String nonExistentFile =
- IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file");
+ String nonExistentFile = FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
+ .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE).toString();
assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
- ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
+ ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
}
/**
@@ -485,4 +510,12 @@ public class PackageUtilTest {
HttpResponse response = request.execute();
return GoogleJsonResponseException.from(jsonFactory, response);
}
+
+ private StorageObject createStorageObject(String gcsFilename, long fileSize) {
+ GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
+ return new StorageObject()
+ .setBucket(gcsPath.getBucket())
+ .setName(gcsPath.getObject())
+ .setSize(BigInteger.valueOf(fileSize));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index e4f00ea..0110a0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -111,26 +113,30 @@ public class FileSystems {
*
* @param spec a resource specification that matches exactly one result.
* @return the {@link Metadata} for the specified resource.
+ * @throws FileNotFoundException if the file resource is not found.
* @throws IOException in the event of an error in the inner call to {@link #match},
* or if the given spec does not match exactly 1 result.
*/
public static Metadata matchSingleFileSpec(String spec) throws IOException {
List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec));
MatchResult matchResult = Iterables.getOnlyElement(matches);
- if (matchResult.status() != Status.OK) {
+ if (matchResult.status() == Status.NOT_FOUND) {
+ throw new FileNotFoundException(String.format("File spec %s not found", spec));
+ } else if (matchResult.status() != Status.OK) {
throw new IOException(
String.format("Error matching file spec %s: status %s", spec, matchResult.status()));
+ } else {
+ List<Metadata> metadata = matchResult.metadata();
+ if (metadata.size() != 1) {
+ throw new IOException(
+ String.format(
+ "Expecting spec %s to match exactly one file, but matched %s: %s",
+ spec,
+ metadata.size(),
+ metadata));
+ }
+ return metadata.get(0);
}
- List<Metadata> metadata = matchResult.metadata();
- if (metadata.size() != 1) {
- throw new IOException(
- String.format(
- "Expecting spec %s to match exactly one file, but matched %s: %s",
- spec,
- metadata.size(),
- metadata));
- }
- return metadata.get(0);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
new file mode 100644
index 0000000..dbfe960
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+
+/**
+ * An abstract class that contains common configuration options for creating resources.
+ */
+@AutoValue
+public abstract class GcsCreateOptions extends CreateOptions {
+
+ /**
+ * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
+ * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
+ * restrictions and performance implications of this value.
+ */
+ @Nullable
+ public abstract Integer gcsUploadBufferSizeBytes();
+
+ // TODO: Add other GCS options when needed.
+
+ /**
+ * Returns a {@link GcsCreateOptions.Builder}.
+ */
+ public static GcsCreateOptions.Builder builder() {
+ return new AutoValue_GcsCreateOptions.Builder();
+ }
+
+ /**
+ * A builder for {@link GcsCreateOptions}.
+ */
+ @AutoValue.Builder
+ public abstract static class Builder extends CreateOptions.Builder<GcsCreateOptions.Builder> {
+ public abstract GcsCreateOptions build();
+ public abstract GcsCreateOptions.Builder setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 69dd8fc..38b8347 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -102,7 +102,12 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
@Override
protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
throws IOException {
- return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
+ if (createOptions instanceof GcsCreateOptions) {
+ return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType(),
+ ((GcsCreateOptions) createOptions).gcsUploadBufferSizeBytes());
+ } else {
+ return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index c8e6839..ee2e231 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -436,8 +436,16 @@ public class GcsUtil {
* @param type the type of object, eg "text/plain".
* @return a Callable object that encloses the operation.
*/
- public WritableByteChannel create(GcsPath path,
- String type) throws IOException {
+ public WritableByteChannel create(GcsPath path, String type) throws IOException {
+ return create(path, type, uploadBufferSizeBytes);
+ }
+
+ /**
+ * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding
+ * {code uploadBufferSizeBytes}.
+ */
+ public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes)
+ throws IOException {
GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel(
executorService,
storageClient,