You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/27 17:25:02 UTC

[1/2] beam git commit: [BEAM-59] Move IOChannelUtils.constructName to FileBasedSink

Repository: beam
Updated Branches:
  refs/heads/master 1babed250 -> 0c740f436


[BEAM-59] Move IOChannelUtils.constructName to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/235a79a3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/235a79a3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/235a79a3

Branch: refs/heads/master
Commit: 235a79a314336e8876bf3f50e5efc952270b8404
Parents: 1babed2
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 24 15:30:56 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 10:24:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 46 +++++++++++++++++-
 .../apache/beam/sdk/util/IOChannelUtils.java    | 50 ++------------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 27 ++++++++++-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  2 +-
 .../beam/sdk/util/IOChannelUtilsTest.java       | 30 ------------
 6 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 6ae950e..b8ad0a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -32,12 +32,16 @@ import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -104,6 +108,46 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+  // Pattern that matches shard placeholders within a shard template.
+  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+
+  /**
+   * Constructs a fully qualified name from components.
+   *
+   * <p>The name is built from a prefix, shard template (with shard numbers
+   * applied), and a suffix.  All components are required, but may be empty
+   * strings.
+   *
+   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
+   * are replaced with the shard number, or number of shards respectively.  The
+   * numbers are formatted with leading zeros to match the length of the
+   * repeated sequence of letters.
+   *
+   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
+   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
+   * produced:  "output-001-of-100.txt".
+   */
+  public static String constructName(String prefix,
+      String shardTemplate, String suffix, int shardNum, int numShards) {
+    // Matcher API works with StringBuffer, rather than StringBuilder.
+    StringBuffer sb = new StringBuffer();
+    sb.append(prefix);
+
+    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
+    while (m.find()) {
+      boolean isShardNum = (m.group(1).charAt(0) == 'S');
+
+      char[] zeros = new char[m.end() - m.start()];
+      Arrays.fill(zeros, '0');
+      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
+      String formatted = df.format(isShardNum ? shardNum : numShards);
+      m.appendReplacement(sb, formatted);
+    }
+    m.appendTail(sb);
+
+    sb.append(suffix);
+    return sb.toString();
+  }
 
   /**
    * Directly supported file output compression types.
@@ -301,7 +345,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       }
 
       String suffix = getFileExtension(extension);
-      String filename = IOChannelUtils.constructName(
+      String filename = constructName(
           baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
           context.getNumShards());
       return filename;

http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 9d3dd23..0d91bbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -30,8 +30,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.text.DecimalFormat;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,6 +41,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -54,9 +53,6 @@ public class IOChannelUtils {
   private static final Map<String, IOChannelFactory> FACTORY_MAP =
       Collections.synchronizedMap(new HashMap<String, IOChannelFactory>());
 
-  // Pattern that matches shard placeholders within a shard template.
-  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
-
   private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();
 
   /**
@@ -201,7 +197,7 @@ public class IOChannelUtils {
   public static WritableByteChannel create(String prefix, String shardTemplate,
       String suffix, int numShards, String mimeType) throws IOException {
     if (numShards == 1) {
-      return create(constructName(prefix, shardTemplate, suffix, 0, 1),
+      return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
                     mimeType);
     }
 
@@ -213,7 +209,7 @@ public class IOChannelUtils {
     Set<String> outputNames = new HashSet<>();
     for (int i = 0; i < numShards; i++) {
       String outputName =
-          constructName(prefix, shardTemplate, suffix, i, numShards);
+          FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
       if (!outputNames.add(outputName)) {
         throw new IllegalArgumentException(
             "Shard name collision detected for: " + outputName);
@@ -236,46 +232,6 @@ public class IOChannelUtils {
     return getFactory(spec).getSizeBytes(spec);
   }
 
-  /**
-   * Constructs a fully qualified name from components.
-   *
-   * <p>The name is built from a prefix, shard template (with shard numbers
-   * applied), and a suffix.  All components are required, but may be empty
-   * strings.
-   *
-   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
-   * are replaced with the shard number, or number of shards respectively.  The
-   * numbers are formatted with leading zeros to match the length of the
-   * repeated sequence of letters.
-   *
-   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
-   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
-   * produced:  "output-001-of-100.txt".
-   */
-  public static String constructName(String prefix,
-      String shardTemplate, String suffix, int shardNum, int numShards) {
-    // Matcher API works with StringBuffer, rather than StringBuilder.
-    StringBuffer sb = new StringBuffer();
-    sb.append(prefix);
-
-    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
-    while (m.find()) {
-      boolean isShardNum = (m.group(1).charAt(0) == 'S');
-
-      char[] zeros = new char[m.end() - m.start()];
-      Arrays.fill(zeros, '0');
-      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
-      String formatted = df.format(isShardNum
-                                   ? shardNum
-                                   : numShards);
-      m.appendReplacement(sb, formatted);
-    }
-    m.appendTail(sb);
-
-    sb.append(suffix);
-    return sb.toString();
-  }
-
   private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
       "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
 

http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 3e1c4b8..ece7997 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -497,7 +497,7 @@ public class AvroIOTest {
     for (int i = 0; i < numShards; i++) {
       expectedFiles.add(
           new File(
-              IOChannelUtils.constructName(
+              FileBasedSink.constructName(
                   outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 65fb8ba..fe65a83 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.io.FileBasedSink.constructName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,7 +45,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.zip.GZIPInputStream;
-
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
@@ -87,6 +86,30 @@ public class FileBasedSinkTest {
     return appendToTempFolder(tempDirectory);
   }
 
+  @Test
+  public void testConstructName() {
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+
+    assertEquals("out.txt/part-00042",
+        constructName("out.txt", "/part-SSSSS", "", 42, 100));
+
+    assertEquals("out.txt",
+        constructName("ou", "t.t", "xt", 1, 1));
+
+    assertEquals("out0102shard.txt",
+        constructName("out", "SSNNshard", ".txt", 1, 2));
+
+    assertEquals("out-2/1.part-1-of-2.txt",
+        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+  }
+
+  @Test
+  public void testConstructNameWithLargeShardCount() {
+    assertEquals("out-100-of-5000.txt",
+        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+  }
+
   /**
    * FileBasedWriter opens the correct file, writes the header, footer, and elements in the
    * correct order, and returns the correct filename.

http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 47b4963..b59938e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -339,7 +339,7 @@ public class TextIOTest {
         expectedFiles.add(
             new File(
                 rootLocation.toString(),
-                IOChannelUtils.constructName(outputName, shardNameTemplate, "", i, numShards)));
+                FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
index 6dfa4c7..ea4ae87 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
@@ -28,7 +28,6 @@ import com.google.common.io.Files;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -47,28 +46,6 @@ public class IOChannelUtilsTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
-  @Test
-  public void testShardFormatExpansion() {
-    assertEquals("output-001-of-123.txt",
-        IOChannelUtils.constructName("output", "-SSS-of-NNN",
-            ".txt",
-            1, 123));
-
-    assertEquals("out.txt/part-00042",
-        IOChannelUtils.constructName("out.txt", "/part-SSSSS", "",
-            42, 100));
-
-    assertEquals("out.txt",
-        IOChannelUtils.constructName("ou", "t.t", "xt", 1, 1));
-
-    assertEquals("out0102shard.txt",
-        IOChannelUtils.constructName("out", "SSNNshard", ".txt", 1, 2));
-
-    assertEquals("out-2/1.part-1-of-2.txt",
-        IOChannelUtils.constructName("out", "-N/S.part-S-of-N",
-            ".txt", 1, 2));
-  }
-
   @Test(expected = IllegalArgumentException.class)
   public void testShardNameCollision() throws Exception {
     File outFolder = tmpFolder.newFolder();
@@ -80,13 +57,6 @@ public class IOChannelUtilsTest {
   }
 
   @Test
-  public void testLargeShardCount() {
-    Assert.assertEquals("out-100-of-5000.txt",
-        IOChannelUtils.constructName("out", "-SS-of-NN", ".txt",
-            100, 5000));
-  }
-
-  @Test
   public void testHandlerNoScheme() throws Exception {
     String pathToTempFolder = tmpFolder.getRoot().getAbsolutePath();
     assertThat(IOChannelUtils.getFactory(pathToTempFolder), instanceOf(FileIOChannelFactory.class));


[2/2] beam git commit: This closes #2724

Posted by dh...@apache.org.
This closes #2724


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c740f43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c740f43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c740f43

Branch: refs/heads/master
Commit: 0c740f436a081197a3917c30d4f7db2234976845
Parents: 1babed2 235a79a
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 27 10:24:51 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 10:24:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 46 +++++++++++++++++-
 .../apache/beam/sdk/util/IOChannelUtils.java    | 50 ++------------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 27 ++++++++++-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  2 +-
 .../beam/sdk/util/IOChannelUtilsTest.java       | 30 ------------
 6 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------