You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/26 05:57:49 UTC

[1/2] beam git commit: [BEAM-2276] Add windowing into default filename policy

Repository: beam
Updated Branches:
  refs/heads/master 9cd12907e -> deee5b3c2


[BEAM-2276] Add windowing into default filename policy


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e8c388a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e8c388a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e8c388a

Branch: refs/heads/master
Commit: 4e8c388a46636a8e5391517876ec1adb818b8d61
Parents: 9cd1290
Author: Borisa Zivkovic <bo...@huawei.com>
Authored: Mon May 15 08:56:19 2017 +0100
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri May 26 07:35:07 2017 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 151 +++++++++++++++++--
 .../java/org/apache/beam/sdk/io/TextIO.java     |   8 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  57 +++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  10 --
 4 files changed, 196 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 07bc2db..5073854 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -32,22 +32,58 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A default {@link FilenamePolicy} for unwindowed files. This policy is constructed using three
- * parameters that together define the output name of a sharded file, in conjunction with the number
- * of shards and index of the particular file, using {@link #constructName}.
+ * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed
+ * using three parameters that together define the output name of a sharded file, in conjunction
+ * with the number of shards, index of the particular file, current window and pane information,
+ * using {@link #constructName}.
  *
- * <p>Most users of unwindowed files will use this {@link DefaultFilenamePolicy}. For more advanced
+ * <p>Most users will use this {@link DefaultFilenamePolicy}. For more advanced
  * uses in generating different files for each window and other sharding controls, see the
  * {@code WriteOneFilePerWindow} example pipeline.
  */
 public final class DefaultFilenamePolicy extends FilenamePolicy {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);
+
   /** The default sharding name template used in {@link #constructUsingStandardParameters}. */
   public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
+  /** The default windowed sharding name template used when writing windowed files.
+   *  This is used as default in cases when user did not specify shard template to
+   *  be used and there is a need to write windowed files. In cases when user does
+   *  specify shard template to be used then provided template will be used for both
+   *  windowed and non-windowed file names.
+   */
+  private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
+      "P-W" + DEFAULT_SHARD_TEMPLATE;
+
+  /*
+   * pattern for only non-windowed file names
+   */
+  private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+";
+
+  /*
+   * pattern for only windowed file names
+   */
+  private static final String WINDOWED_ONLY_PATTERN = "P|W";
+
+  /*
+   * pattern for both windowed and non-windowed file names
+   */
+  private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|"
+   + WINDOWED_ONLY_PATTERN + ")";
+
   // Pattern that matches shard placeholders within a shard template.
-  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+  private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN);
+  private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN);
 
   /**
    * Constructs a new {@link DefaultFilenamePolicy}.
@@ -69,7 +105,8 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    * <p>Any filename component of the provided resource will be used as the filename prefix.
    *
    * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
-   * will be used.
+   * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will
+   * be used for windowed file names.
    *
    * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
    */
@@ -87,6 +124,19 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
   private final String shardTemplate;
   private final String suffix;
 
+  /*
+   * Checks whether given template contains enough information to form
+   * meaningful windowed file names - ie whether it uses pane and window
+   * info.
+   */
+  static boolean isWindowedTemplate(String template){
+    if (template != null){
+      Matcher m = WINDOWED_FORMAT_RE.matcher(template);
+      return m.find();
+    }
+    return false;
+  }
+
   /**
    * Constructs a fully qualified name from components.
    *
@@ -95,29 +145,45 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
    * strings.
    *
    * <p>Within a shard template, repeating sequences of the letters "S" or "N"
-   * are replaced with the shard number, or number of shards respectively.  The
-   * numbers are formatted with leading zeros to match the length of the
+   * are replaced with the shard number, or number of shards respectively.
+   * "P" is replaced with by stringification of current pane.
+   * "W" is replaced by stringification of current window.
+   *
+   * <p>The numbers are formatted with leading zeros to match the length of the
    * repeated sequence of letters.
    *
    * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
    * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
    * produced:  "output-001-of-100.txt".
    */
-  public static String constructName(
-      String prefix, String shardTemplate, String suffix, int shardNum, int numShards) {
+  static String constructName(
+      String prefix, String shardTemplate, String suffix, int shardNum, int numShards,
+      String paneStr, String windowStr) {
     // Matcher API works with StringBuffer, rather than StringBuilder.
     StringBuffer sb = new StringBuffer();
     sb.append(prefix);
 
     Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
     while (m.find()) {
-      boolean isShardNum = (m.group(1).charAt(0) == 'S');
+      boolean isCurrentShardNum = (m.group(1).charAt(0) == 'S');
+      boolean isNumberOfShards = (m.group(1).charAt(0) == 'N');
+      boolean isPane = (m.group(1).charAt(0) == 'P') && paneStr != null;
+      boolean isWindow = (m.group(1).charAt(0) == 'W') && windowStr != null;
 
       char[] zeros = new char[m.end() - m.start()];
       Arrays.fill(zeros, '0');
       DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
-      String formatted = df.format(isShardNum ? shardNum : numShards);
-      m.appendReplacement(sb, formatted);
+      if (isCurrentShardNum) {
+        String formatted = df.format(shardNum);
+        m.appendReplacement(sb, formatted);
+      } else if (isNumberOfShards) {
+        String formatted = df.format(numShards);
+        m.appendReplacement(sb, formatted);
+      } else if (isPane) {
+        m.appendReplacement(sb, paneStr);
+      } else if (isWindow) {
+        m.appendReplacement(sb, windowStr);
+      }
     }
     m.appendTail(sb);
 
@@ -125,6 +191,11 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
     return sb.toString();
   }
 
+  static String constructName(String prefix, String shardTemplate, String suffix, int shardNum,
+      int numShards) {
+    return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null);
+  }
+
   @Override
   @Nullable
   public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
@@ -138,9 +209,57 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
 
   @Override
   public ResourceId windowedFilename(ResourceId outputDirectory,
-      WindowedContext c, String extension) {
-    throw new UnsupportedOperationException("There is no default policy for windowed file"
-        + " output. Please provide an explicit FilenamePolicy to generate filenames.");
+      WindowedContext context, String extension) {
+
+    boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE);
+
+    if (shardTemplateProvidedByUser){
+      boolean isWindowed = isWindowedTemplate(this.shardTemplate);
+      if (!isWindowed){
+        LOG.info("Template you provided {} does not have enough information to create"
+            + "meaningful windowed file names. Consider using P and W in your template",
+            this.shardTemplate);
+      }
+    }
+
+    final PaneInfo paneInfo = context.getPaneInfo();
+    String paneStr = paneInfoToString(paneInfo);
+    String windowStr = windowToString(context.getWindow());
+
+    String templateToUse = shardTemplate;
+    if (!shardTemplateProvidedByUser){
+      LOG.info("User did not provide shard template. For creating windowed file names "
+          + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE);
+      templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE;
+    }
+
+    String filename = constructName(prefix.get(), templateToUse, suffix,
+        context.getShardNumber(), context.getNumShards(), paneStr, windowStr)
+        + extension;
+    return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  /*
+   * Since not all windows have toString() that is nice or is compatible to be a part of file name.
+   */
+  private String windowToString(BoundedWindow window) {
+    if (window instanceof GlobalWindow) {
+      return "GlobalWindow";
+    }
+    if (window instanceof IntervalWindow) {
+      IntervalWindow iw = (IntervalWindow) window;
+      return String.format("IntervalWindow-%s-%s", iw.start().toString(),
+          iw.end().toString());
+    }
+    return window.toString();
+  }
+
+  private String paneInfoToString(PaneInfo paneInfo){
+    long currentPaneIndex = (paneInfo == null ? -1L
+        : paneInfo.getIndex());
+    boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst());
+    boolean lastPane = (paneInfo == null ? false : paneInfo.isLast());
+    return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 5c068ce..afb5849 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -70,8 +70,10 @@ import org.apache.beam.sdk.values.PDone;
  * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
  * preserved. When producing windowed writes, the number of output shards must be set explicitly
  * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
- * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
- * set, and unique windows and triggers must produce unique filenames.
+ * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be
+ * set in case you need better control over naming files created by unique windows.
+ * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate
+ * for your use case.
  *
  * <p>Any existing files with the same names as generated output files will be overwritten.
  *
@@ -434,8 +436,6 @@ public class TextIO {
           (getFilenamePolicy() == null)
               || (getShardTemplate() == null && getFilenameSuffix() == null),
           "Cannot set a filename policy and also a filename template or suffix.");
-      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
-          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
 
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index c895da8..787403b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -18,7 +18,10 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
+import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -29,6 +32,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class DefaultFilenamePolicyTest {
+
   @Test
   public void testConstructName() {
     assertEquals("output-001-of-123.txt",
@@ -52,4 +56,57 @@ public class DefaultFilenamePolicyTest {
     assertEquals("out-100-of-5000.txt",
         constructName("out", "-SS-of-NN", ".txt", 100, 5000));
   }
+
+  @Test
+  public void testIsWindowedTemplate(){
+    assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W"));
+    assertTrue(isWindowedTemplate("-SSS-of-NNN-W"));
+    assertTrue(isWindowedTemplate("-SSS-of-NNN-P"));
+    assertTrue(isWindowedTemplate("W-SSS-of-NNN"));
+
+    assertFalse(isWindowedTemplate("-SSS-of-NNN"));
+    assertFalse(isWindowedTemplate("-SSS-of-lp"));
+  }
+
+  @Test
+  public void testConstructWindowedName() {
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+
+    assertEquals("output-001-of-123-PPP-W.txt",
+        constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
+
+    assertEquals("out.txt/part-00042-myPaneStr-myWindowStr",
+        constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr",
+            "myWindowStr"));
+
+    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2",
+        "anotherWindowStr"));
+
+    assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
+        constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr",
+            "oneMoreWindowStr"));
+
+    assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
+        + "panemyPaneStr3.txt",
+        constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3",
+        "slidingWindow1"));
+
+    // test first/last pane
+    assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false",
+        constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false",
+            "myWindowStr"));
+
+    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane",
+        "anotherWindowStr"));
+
+    assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
+        constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false",
+            "oneMoreWindowStr"));
+
+    assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
+        constructName("out",
+        "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4e8c388a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 0d8fbbd..6c7a53f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -1103,15 +1103,5 @@ public class TextIOTest {
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
 
-  @Test
-  public void testWindowedWriteRequiresFilenamePolicy() {
-    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
-    TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites();
-
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage(
-        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
-    emptyInput.apply(write);
-  }
 }
 


[2/2] beam git commit: [BEAM-2276] This closes #3142

Posted by jb...@apache.org.
[BEAM-2276] This closes #3142


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deee5b3c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deee5b3c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deee5b3c

Branch: refs/heads/master
Commit: deee5b3c29f16cc29db7ee30d4f482b360df53f0
Parents: 9cd1290 4e8c388
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Fri May 26 07:57:44 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Fri May 26 07:57:44 2017 +0200

----------------------------------------------------------------------
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 151 +++++++++++++++++--
 .../java/org/apache/beam/sdk/io/TextIO.java     |   8 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  57 +++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  10 --
 4 files changed, 196 insertions(+), 30 deletions(-)
----------------------------------------------------------------------