You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/27 17:25:02 UTC
[1/2] beam git commit: [BEAM-59] Move IOChannelUtils.constructName to
FileBasedSink
Repository: beam
Updated Branches:
refs/heads/master 1babed250 -> 0c740f436
[BEAM-59] Move IOChannelUtils.constructName to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/235a79a3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/235a79a3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/235a79a3
Branch: refs/heads/master
Commit: 235a79a314336e8876bf3f50e5efc952270b8404
Parents: 1babed2
Author: Dan Halperin <dh...@google.com>
Authored: Mon Apr 24 15:30:56 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 10:24:48 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 46 +++++++++++++++++-
.../apache/beam/sdk/util/IOChannelUtils.java | 50 ++------------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 27 ++++++++++-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +-
.../beam/sdk/util/IOChannelUtilsTest.java | 30 ------------
6 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 6ae950e..b8ad0a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -32,12 +32,16 @@ import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
+import java.text.DecimalFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
@@ -104,6 +108,46 @@ import org.slf4j.LoggerFactory;
*/
public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+ // Pattern that matches shard placeholders within a shard template.
+ private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+
+ /**
+ * Constructs a fully qualified name from components.
+ *
+ * <p>The name is built from a prefix, shard template (with shard numbers
+ * applied), and a suffix. All components are required, but may be empty
+ * strings.
+ *
+ * <p>Within a shard template, repeating sequences of the letters "S" or "N"
+ * are replaced with the shard number, or number of shards respectively. The
+ * numbers are formatted with leading zeros to match the length of the
+ * repeated sequence of letters.
+ *
+ * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
+ * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
+ * produced: "output-001-of-100.txt".
+ */
+ public static String constructName(String prefix,
+ String shardTemplate, String suffix, int shardNum, int numShards) {
+ // Matcher API works with StringBuffer, rather than StringBuilder.
+ StringBuffer sb = new StringBuffer();
+ sb.append(prefix);
+
+ Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
+ while (m.find()) {
+ boolean isShardNum = (m.group(1).charAt(0) == 'S');
+
+ char[] zeros = new char[m.end() - m.start()];
+ Arrays.fill(zeros, '0');
+ DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
+ String formatted = df.format(isShardNum ? shardNum : numShards);
+ m.appendReplacement(sb, formatted);
+ }
+ m.appendTail(sb);
+
+ sb.append(suffix);
+ return sb.toString();
+ }
/**
* Directly supported file output compression types.
@@ -301,7 +345,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
String suffix = getFileExtension(extension);
- String filename = IOChannelUtils.constructName(
+ String filename = constructName(
baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
context.getNumShards());
return filename;
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 9d3dd23..0d91bbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -30,8 +30,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.text.DecimalFormat;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -43,6 +41,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
@@ -54,9 +53,6 @@ public class IOChannelUtils {
private static final Map<String, IOChannelFactory> FACTORY_MAP =
Collections.synchronizedMap(new HashMap<String, IOChannelFactory>());
- // Pattern that matches shard placeholders within a shard template.
- private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
-
private static final ClassLoader CLASS_LOADER = ReflectHelpers.findClassLoader();
/**
@@ -201,7 +197,7 @@ public class IOChannelUtils {
public static WritableByteChannel create(String prefix, String shardTemplate,
String suffix, int numShards, String mimeType) throws IOException {
if (numShards == 1) {
- return create(constructName(prefix, shardTemplate, suffix, 0, 1),
+ return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
mimeType);
}
@@ -213,7 +209,7 @@ public class IOChannelUtils {
Set<String> outputNames = new HashSet<>();
for (int i = 0; i < numShards; i++) {
String outputName =
- constructName(prefix, shardTemplate, suffix, i, numShards);
+ FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
if (!outputNames.add(outputName)) {
throw new IllegalArgumentException(
"Shard name collision detected for: " + outputName);
@@ -236,46 +232,6 @@ public class IOChannelUtils {
return getFactory(spec).getSizeBytes(spec);
}
- /**
- * Constructs a fully qualified name from components.
- *
- * <p>The name is built from a prefix, shard template (with shard numbers
- * applied), and a suffix. All components are required, but may be empty
- * strings.
- *
- * <p>Within a shard template, repeating sequences of the letters "S" or "N"
- * are replaced with the shard number, or number of shards respectively. The
- * numbers are formatted with leading zeros to match the length of the
- * repeated sequence of letters.
- *
- * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
- * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
- * produced: "output-001-of-100.txt".
- */
- public static String constructName(String prefix,
- String shardTemplate, String suffix, int shardNum, int numShards) {
- // Matcher API works with StringBuffer, rather than StringBuilder.
- StringBuffer sb = new StringBuffer();
- sb.append(prefix);
-
- Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
- while (m.find()) {
- boolean isShardNum = (m.group(1).charAt(0) == 'S');
-
- char[] zeros = new char[m.end() - m.start()];
- Arrays.fill(zeros, '0');
- DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
- String formatted = df.format(isShardNum
- ? shardNum
- : numShards);
- m.appendReplacement(sb, formatted);
- }
- m.appendTail(sb);
-
- sb.append(suffix);
- return sb.toString();
- }
-
private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
"(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 3e1c4b8..ece7997 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -497,7 +497,7 @@ public class AvroIOTest {
for (int i = 0; i < numShards; i++) {
expectedFiles.add(
new File(
- IOChannelUtils.constructName(
+ FileBasedSink.constructName(
outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 65fb8ba..fe65a83 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.io.FileBasedSink.constructName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -45,7 +45,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
-
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
@@ -87,6 +86,30 @@ public class FileBasedSinkTest {
return appendToTempFolder(tempDirectory);
}
+ @Test
+ public void testConstructName() {
+ assertEquals("output-001-of-123.txt",
+ constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+
+ assertEquals("out.txt/part-00042",
+ constructName("out.txt", "/part-SSSSS", "", 42, 100));
+
+ assertEquals("out.txt",
+ constructName("ou", "t.t", "xt", 1, 1));
+
+ assertEquals("out0102shard.txt",
+ constructName("out", "SSNNshard", ".txt", 1, 2));
+
+ assertEquals("out-2/1.part-1-of-2.txt",
+ constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+ }
+
+ @Test
+ public void testConstructNameWithLargeShardCount() {
+ assertEquals("out-100-of-5000.txt",
+ constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+ }
+
/**
* FileBasedWriter opens the correct file, writes the header, footer, and elements in the
* correct order, and returns the correct filename.
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 47b4963..b59938e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -339,7 +339,7 @@ public class TextIOTest {
expectedFiles.add(
new File(
rootLocation.toString(),
- IOChannelUtils.constructName(outputName, shardNameTemplate, "", i, numShards)));
+ FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/235a79a3/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
index 6dfa4c7..ea4ae87 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IOChannelUtilsTest.java
@@ -28,7 +28,6 @@ import com.google.common.io.Files;
import java.io.File;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -47,28 +46,6 @@ public class IOChannelUtilsTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
- @Test
- public void testShardFormatExpansion() {
- assertEquals("output-001-of-123.txt",
- IOChannelUtils.constructName("output", "-SSS-of-NNN",
- ".txt",
- 1, 123));
-
- assertEquals("out.txt/part-00042",
- IOChannelUtils.constructName("out.txt", "/part-SSSSS", "",
- 42, 100));
-
- assertEquals("out.txt",
- IOChannelUtils.constructName("ou", "t.t", "xt", 1, 1));
-
- assertEquals("out0102shard.txt",
- IOChannelUtils.constructName("out", "SSNNshard", ".txt", 1, 2));
-
- assertEquals("out-2/1.part-1-of-2.txt",
- IOChannelUtils.constructName("out", "-N/S.part-S-of-N",
- ".txt", 1, 2));
- }
-
@Test(expected = IllegalArgumentException.class)
public void testShardNameCollision() throws Exception {
File outFolder = tmpFolder.newFolder();
@@ -80,13 +57,6 @@ public class IOChannelUtilsTest {
}
@Test
- public void testLargeShardCount() {
- Assert.assertEquals("out-100-of-5000.txt",
- IOChannelUtils.constructName("out", "-SS-of-NN", ".txt",
- 100, 5000));
- }
-
- @Test
public void testHandlerNoScheme() throws Exception {
String pathToTempFolder = tmpFolder.getRoot().getAbsolutePath();
assertThat(IOChannelUtils.getFactory(pathToTempFolder), instanceOf(FileIOChannelFactory.class));
[2/2] beam git commit: This closes #2724
Posted by dh...@apache.org.
This closes #2724
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c740f43
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c740f43
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c740f43
Branch: refs/heads/master
Commit: 0c740f436a081197a3917c30d4f7db2234976845
Parents: 1babed2 235a79a
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 27 10:24:51 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 27 10:24:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 46 +++++++++++++++++-
.../apache/beam/sdk/util/IOChannelUtils.java | 50 ++------------------
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 27 ++++++++++-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +-
.../beam/sdk/util/IOChannelUtilsTest.java | 30 ------------
6 files changed, 75 insertions(+), 82 deletions(-)
----------------------------------------------------------------------