You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:49 UTC
[beam] 02/13: Merges Writer.openWindowed/Unwindowed and removes
result of close()
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5795c32c91a6de02b6731eacb5eef8ae55f069f5
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Tue Oct 17 17:06:43 2017 -0700
Merges Writer.openWindowed/Unwindowed and removes result of close()
---
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 94 ++++++----------------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 69 +++++++++-------
.../org/apache/beam/sdk/io/FileBasedSinkTest.java | 15 ++--
3 files changed, 71 insertions(+), 107 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d4cb57d..2108253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
* event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
+ * result passed to the finalize method. Each call to {@link Writer#open} or {@link
* Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
* transform, so even redundant or retried bundles will have a unique way of identifying their
* output.
@@ -805,10 +805,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
/** Unique id for this output bundle. */
private @Nullable String id;
- private @Nullable BoundedWindow window;
- private @Nullable PaneInfo paneInfo;
- private int shard = -1;
- private @Nullable DestinationT destination;
+ private DestinationT destination;
/** The output file for this bundle. May be null if opening failed. */
private @Nullable ResourceId outputFile;
@@ -868,53 +865,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* id populated for the case of static sharding. In cases where the runner is dynamically
* picking sharding, shard might be set to -1.
*/
- public final void openWindowed(
- String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
- throws Exception {
- if (!getWriteOperation().windowedWrites) {
- throw new IllegalStateException("openWindowed called a non-windowed sink.");
- }
- open(uId, window, paneInfo, shard, destination);
- }
-
- /** Called for each value in the bundle. */
- public abstract void write(OutputT value) throws Exception;
-
- /**
- * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
- */
- public final void openUnwindowed(String uId, int shard, DestinationT destination)
- throws Exception {
- if (getWriteOperation().windowedWrites) {
- throw new IllegalStateException("openUnwindowed called a windowed sink.");
- }
- open(uId, null, null, shard, destination);
- }
-
- // Helper function to close a channel, on exception cases.
- // Always throws prior exception, with any new closing exception suppressed.
- private static void closeChannelAndThrow(
- WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
- try {
- channel.close();
- } catch (Exception e) {
- LOG.error("Closing channel for {} failed.", filename, e);
- prior.addSuppressed(e);
- throw prior;
- }
- }
-
- private void open(
- String uId,
- @Nullable BoundedWindow window,
- @Nullable PaneInfo paneInfo,
- int shard,
- DestinationT destination)
+ public final void open(
+ String uId, DestinationT destination)
throws Exception {
this.id = uId;
- this.window = window;
- this.paneInfo = paneInfo;
- this.shard = shard;
this.destination = destination;
ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
@@ -925,15 +879,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
getWriteOperation().getSink().writableByteChannelFactory;
// The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
- LOG.info(
- "Opening temporary file {} with MIME type {} "
- + "to write destination {} shard {} window {} pane {}",
- outputFile,
- channelMimeType,
- destination,
- shard,
- window,
- paneInfo);
WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
try {
channel = factory.create(tempChannel);
@@ -960,6 +905,26 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile);
}
+ /** Called for each value in the bundle. */
+ public abstract void write(OutputT value) throws Exception;
+
+ public ResourceId getOutputFile() {
+ return outputFile;
+ }
+
+ // Helper function to close a channel, on exception cases.
+ // Always throws prior exception, with any new closing exception suppressed.
+ private static void closeChannelAndThrow(
+ WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
+ try {
+ channel.close();
+ } catch (Exception e) {
+ LOG.error("Closing channel for {} failed.", filename, e);
+ prior.addSuppressed(e);
+ throw prior;
+ }
+ }
+
public final void cleanup() throws Exception {
if (outputFile != null) {
LOG.info("Deleting temporary file {}", outputFile);
@@ -970,22 +935,19 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
}
/** Closes the channel and returns the bundle result. */
- public final FileResult<DestinationT> close() throws Exception {
+ public final void close() throws Exception {
checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
+ LOG.debug("Closing {}", outputFile);
- LOG.debug("Writing footer to {}.", outputFile);
try {
writeFooter();
} catch (Exception e) {
- LOG.error("Writing footer to {} failed, closing channel.", outputFile, e);
closeChannelAndThrow(channel, outputFile, e);
}
- LOG.debug("Finishing write to {}.", outputFile);
try {
finishWrite();
} catch (Exception e) {
- LOG.error("Finishing write to {} failed, closing channel.", outputFile, e);
closeChannelAndThrow(channel, outputFile, e);
}
@@ -1001,11 +963,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
} catch (Exception e) {
throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
}
-
- FileResult<DestinationT> result =
- new FileResult<>(outputFile, shard, window, paneInfo, destination);
LOG.info("Successfully wrote temporary file {}", outputFile);
- return result;
}
/** Return the WriteOperation that this Writer belongs to. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index c99abce..35b28a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -425,18 +425,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
if (writers.size() <= maxNumWritersPerBundle) {
String uuid = UUID.randomUUID().toString();
LOG.info(
- "Opening writer {} for write operation {}, window {} pane {} destination {}",
+ "Opening writer {} for window {} pane {} destination {}",
uuid,
- writeOperation,
window,
paneInfo,
destination);
writer = writeOperation.createWriter();
- if (windowedWrites) {
- writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
- } else {
- writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
- }
+ writer.open(uuid, destination);
writers.put(key, writer);
LOG.debug("Done opening writer");
} else {
@@ -461,17 +456,23 @@ public class WriteFiles<UserT, DestinationT, OutputT>
public void finishBundle(FinishBundleContext c) throws Exception {
for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
writers.entrySet()) {
+ WriterKey<DestinationT> key = entry.getKey();
Writer<DestinationT, OutputT> writer = entry.getValue();
- FileResult<DestinationT> result;
try {
- result = writer.close();
+ writer.close();
} catch (Exception e) {
// If anything goes wrong, make sure to delete the temporary file.
writer.cleanup();
throw e;
}
- BoundedWindow window = entry.getKey().window;
- c.output(result, window.maxTimestamp(), window);
+ BoundedWindow window = key.window;
+ FileResult<DestinationT> res =
+ windowedWrites
+ ? new FileResult<>(
+ writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
+ : new FileResult<>(
+ writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
+ c.output(res, window.maxTimestamp(), window);
}
}
@@ -505,20 +506,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
DestinationT destination = sink.getDynamicDestinations().getDestination(input);
Writer<DestinationT, OutputT> writer = writers.get(destination);
if (writer == null) {
- LOG.debug("Opening writer for write operation {}", writeOperation);
+ String uuid = UUID.randomUUID().toString();
+ LOG.info(
+ "Opening writer {} for window {} pane {} destination {}",
+ uuid,
+ window,
+ c.pane(),
+ destination);
writer = writeOperation.createWriter();
- int shardNumber =
- shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
- ? c.element().getKey().getShardNumber()
- : UNKNOWN_SHARDNUM;
- if (windowedWrites) {
- writer.openWindowed(
- UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
- } else {
- writer.openUnwindowed(
- UUID.randomUUID().toString(), shardNumber, destination);
- }
- LOG.debug("Done opening writer");
+ writer.open(uuid, destination);
writers.put(destination, writer);
}
writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
@@ -527,16 +523,26 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// Close all writers.
for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
Writer<DestinationT, OutputT> writer = entry.getValue();
- FileResult<DestinationT> result;
try {
// Close the writer; if this throws let the error propagate.
- result = writer.close();
- c.output(result);
+ writer.close();
} catch (Exception e) {
// If anything goes wrong, make sure to delete the temporary file.
writer.cleanup();
throw e;
}
+ int shardNumber =
+ shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+ ? c.element().getKey().getShardNumber()
+ : UNKNOWN_SHARDNUM;
+ if (windowedWrites) {
+ c.output(
+ new FileResult<>(
+ writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
+ } else {
+ c.output(
+ new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
+ }
}
}
@@ -998,11 +1004,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
existingResults.size(),
destination);
for (int shard : missingShardNums) {
+ String uuid = UUID.randomUUID().toString();
+ LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
Writer<DestinationT, ?> writer = writeOperation.createWriter();
// Currently this code path is only called in the unwindowed case.
- writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination);
- FileResult<DestinationT> emptyWrite = writer.close();
- completeResults.add(emptyWrite);
+ writer.open(uuid, destination);
+ writer.close();
+ completeResults.add(
+ new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
}
LOG.debug("Done creating extra shards for {}.", destination);
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 29f3c1b..f7988bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -102,14 +102,12 @@ public class FileBasedSinkTest {
SimpleSink.SimpleWriter<Void> writer =
buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
- writer.openUnwindowed(testUid, -1, null);
+ writer.open(testUid, null);
for (String value : values) {
writer.write(value);
}
- FileResult result = writer.close();
-
- FileBasedSink sink = writer.getWriteOperation().getSink();
- assertEquals(expectedTempFile, result.getTempFilename());
+ writer.close();
+ assertEquals(expectedTempFile, writer.getOutputFile());
assertFileContains(expected, expectedTempFile);
}
@@ -514,12 +512,11 @@ public class FileBasedSinkTest {
expected.add("footer");
expected.add("footer");
- writer.openUnwindowed(testUid, -1, null);
+ writer.open(testUid, null);
writer.write("a");
writer.write("b");
- final FileResult result = writer.close();
-
- assertEquals(expectedFile, result.getTempFilename());
+ writer.close();
+ assertEquals(expectedFile, writer.getOutputFile());
assertFileContains(expected, expectedFile);
}
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.