You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:49 UTC

[beam] 02/13: Merges Writer.openWindowed/Unwindowed and removes result of close()

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5795c32c91a6de02b6731eacb5eef8ae55f069f5
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Tue Oct 17 17:06:43 2017 -0700

    Merges Writer.openWindowed/Unwindowed and removes result of close()
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 94 ++++++----------------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 69 +++++++++-------
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 15 ++--
 3 files changed, 71 insertions(+), 107 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d4cb57d..2108253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
+ * result passed to the finalize method. Each call to {@link Writer#open} or {@link
  * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
  * transform, so even redundant or retried bundles will have a unique way of identifying their
  * output.
@@ -805,10 +805,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     /** Unique id for this output bundle. */
     private @Nullable String id;
 
-    private @Nullable BoundedWindow window;
-    private @Nullable PaneInfo paneInfo;
-    private int shard = -1;
-    private @Nullable DestinationT destination;
+    private DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
@@ -868,53 +865,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * id populated for the case of static sharding. In cases where the runner is dynamically
      * picking sharding, shard might be set to -1.
      */
-    public final void openWindowed(
-        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
-        throws Exception {
-      if (!getWriteOperation().windowedWrites) {
-        throw new IllegalStateException("openWindowed called a non-windowed sink.");
-      }
-      open(uId, window, paneInfo, shard, destination);
-    }
-
-    /** Called for each value in the bundle. */
-    public abstract void write(OutputT value) throws Exception;
-
-    /**
-     * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
-     */
-    public final void openUnwindowed(String uId, int shard, DestinationT destination)
-        throws Exception {
-      if (getWriteOperation().windowedWrites) {
-        throw new IllegalStateException("openUnwindowed called a windowed sink.");
-      }
-      open(uId, null, null, shard, destination);
-    }
-
-    // Helper function to close a channel, on exception cases.
-    // Always throws prior exception, with any new closing exception suppressed.
-    private static void closeChannelAndThrow(
-        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
-      try {
-        channel.close();
-      } catch (Exception e) {
-        LOG.error("Closing channel for {} failed.", filename, e);
-        prior.addSuppressed(e);
-        throw prior;
-      }
-    }
-
-    private void open(
-        String uId,
-        @Nullable BoundedWindow window,
-        @Nullable PaneInfo paneInfo,
-        int shard,
-        DestinationT destination)
+    public final void open(
+        String uId, DestinationT destination)
         throws Exception {
       this.id = uId;
-      this.window = window;
-      this.paneInfo = paneInfo;
-      this.shard = shard;
       this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
@@ -925,15 +879,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      LOG.info(
-          "Opening temporary file {} with MIME type {} "
-              + "to write destination {} shard {} window {} pane {}",
-          outputFile,
-          channelMimeType,
-          destination,
-          shard,
-          window,
-          paneInfo);
       WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
       try {
         channel = factory.create(tempChannel);
@@ -960,6 +905,26 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile);
     }
 
+    /** Called for each value in the bundle. */
+    public abstract void write(OutputT value) throws Exception;
+
+    public ResourceId getOutputFile() {
+      return outputFile;
+    }
+
+    // Helper function to close a channel, on exception cases.
+    // Always throws prior exception, with any new closing exception suppressed.
+    private static void closeChannelAndThrow(
+        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
+      try {
+        channel.close();
+      } catch (Exception e) {
+        LOG.error("Closing channel for {} failed.", filename, e);
+        prior.addSuppressed(e);
+        throw prior;
+      }
+    }
+
     public final void cleanup() throws Exception {
       if (outputFile != null) {
         LOG.info("Deleting temporary file {}", outputFile);
@@ -970,22 +935,19 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     }
 
     /** Closes the channel and returns the bundle result. */
-    public final FileResult<DestinationT> close() throws Exception {
+    public final void close() throws Exception {
       checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
+      LOG.debug("Closing {}", outputFile);
 
-      LOG.debug("Writing footer to {}.", outputFile);
       try {
         writeFooter();
       } catch (Exception e) {
-        LOG.error("Writing footer to {} failed, closing channel.", outputFile, e);
         closeChannelAndThrow(channel, outputFile, e);
       }
 
-      LOG.debug("Finishing write to {}.", outputFile);
       try {
         finishWrite();
       } catch (Exception e) {
-        LOG.error("Finishing write to {} failed, closing channel.", outputFile, e);
         closeChannelAndThrow(channel, outputFile, e);
       }
 
@@ -1001,11 +963,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       } catch (Exception e) {
         throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
       }
-
-      FileResult<DestinationT> result =
-          new FileResult<>(outputFile, shard, window, paneInfo, destination);
       LOG.info("Successfully wrote temporary file {}", outputFile);
-      return result;
     }
 
     /** Return the WriteOperation that this Writer belongs to. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index c99abce..35b28a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -425,18 +425,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         if (writers.size() <= maxNumWritersPerBundle) {
           String uuid = UUID.randomUUID().toString();
           LOG.info(
-              "Opening writer {} for write operation {}, window {} pane {} destination {}",
+              "Opening writer {} for window {} pane {} destination {}",
               uuid,
-              writeOperation,
               window,
               paneInfo,
               destination);
           writer = writeOperation.createWriter();
-          if (windowedWrites) {
-            writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
-          } else {
-            writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
-          }
+          writer.open(uuid, destination);
           writers.put(key, writer);
           LOG.debug("Done opening writer");
         } else {
@@ -461,17 +456,23 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public void finishBundle(FinishBundleContext c) throws Exception {
       for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
           writers.entrySet()) {
+        WriterKey<DestinationT> key = entry.getKey();
         Writer<DestinationT, OutputT> writer = entry.getValue();
-        FileResult<DestinationT> result;
         try {
-          result = writer.close();
+          writer.close();
         } catch (Exception e) {
           // If anything goes wrong, make sure to delete the temporary file.
           writer.cleanup();
           throw e;
         }
-        BoundedWindow window = entry.getKey().window;
-        c.output(result, window.maxTimestamp(), window);
+        BoundedWindow window = key.window;
+        FileResult<DestinationT> res =
+            windowedWrites
+                ? new FileResult<>(
+                    writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
+                : new FileResult<>(
+                    writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
+        c.output(res, window.maxTimestamp(), window);
       }
     }
 
@@ -505,20 +506,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         DestinationT destination = sink.getDynamicDestinations().getDestination(input);
         Writer<DestinationT, OutputT> writer = writers.get(destination);
         if (writer == null) {
-          LOG.debug("Opening writer for write operation {}", writeOperation);
+          String uuid = UUID.randomUUID().toString();
+          LOG.info(
+              "Opening writer {} for window {} pane {} destination {}",
+              uuid,
+              window,
+              c.pane(),
+              destination);
           writer = writeOperation.createWriter();
-          int shardNumber =
-              shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                  ? c.element().getKey().getShardNumber()
-                  : UNKNOWN_SHARDNUM;
-          if (windowedWrites) {
-            writer.openWindowed(
-                UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
-          } else {
-            writer.openUnwindowed(
-                UUID.randomUUID().toString(), shardNumber, destination);
-          }
-          LOG.debug("Done opening writer");
+          writer.open(uuid, destination);
           writers.put(destination, writer);
         }
         writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
@@ -527,16 +523,26 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // Close all writers.
       for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
         Writer<DestinationT, OutputT> writer = entry.getValue();
-        FileResult<DestinationT> result;
         try {
           // Close the writer; if this throws let the error propagate.
-          result = writer.close();
-          c.output(result);
+          writer.close();
         } catch (Exception e) {
           // If anything goes wrong, make sure to delete the temporary file.
           writer.cleanup();
           throw e;
         }
+        int shardNumber =
+            shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+                ? c.element().getKey().getShardNumber()
+                : UNKNOWN_SHARDNUM;
+        if (windowedWrites) {
+          c.output(
+              new FileResult<>(
+                  writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
+        } else {
+          c.output(
+              new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
+        }
       }
     }
 
@@ -998,11 +1004,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             existingResults.size(),
             destination);
         for (int shard : missingShardNums) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
           Writer<DestinationT, ?> writer = writeOperation.createWriter();
           // Currently this code path is only called in the unwindowed case.
-          writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination);
-          FileResult<DestinationT> emptyWrite = writer.close();
-          completeResults.add(emptyWrite);
+          writer.open(uuid, destination);
+          writer.close();
+          completeResults.add(
+              new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
         }
         LOG.debug("Done creating extra shards for {}.", destination);
       }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 29f3c1b..f7988bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -102,14 +102,12 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter<Void> writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.openUnwindowed(testUid, -1, null);
+    writer.open(testUid, null);
     for (String value : values) {
       writer.write(value);
     }
-    FileResult result = writer.close();
-
-    FileBasedSink sink = writer.getWriteOperation().getSink();
-    assertEquals(expectedTempFile, result.getTempFilename());
+    writer.close();
+    assertEquals(expectedTempFile, writer.getOutputFile());
     assertFileContains(expected, expectedTempFile);
   }
 
@@ -514,12 +512,11 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.openUnwindowed(testUid, -1, null);
+    writer.open(testUid, null);
     writer.write("a");
     writer.write("b");
-    final FileResult result = writer.close();
-
-    assertEquals(expectedFile, result.getTempFilename());
+    writer.close();
+    assertEquals(expectedFile, writer.getOutputFile());
     assertFileContains(expected, expectedFile);
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.