You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:34:55 UTC
[06/50] beam git commit: [BEAM-2276] Cleanups on the windowed
DefaultFilenamePolicy
[BEAM-2276] Cleanups on the windowed DefaultFilenamePolicy
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e764167f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e764167f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e764167f
Branch: refs/heads/DSL_SQL
Commit: e764167f40e603ac00ac80758cac0108bcc49769
Parents: 6d64c6e
Author: Reuven Lax <re...@google.com>
Authored: Thu May 25 23:42:17 2017 -0700
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jun 6 11:08:35 2017 +0200
----------------------------------------------------------------------
.../construction/PTransformMatchersTest.java | 8 +-
.../direct/WriteWithShardingFactoryTest.java | 23 +++--
.../java/org/apache/beam/sdk/io/AvroIO.java | 2 +-
.../beam/sdk/io/DefaultFilenamePolicy.java | 103 +++++--------------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 11 +-
.../beam/sdk/io/DefaultFilenamePolicyTest.java | 26 ++---
.../java/org/apache/beam/sdk/io/TextIOTest.java | 5 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 2 +-
10 files changed, 69 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cfea62f..2497598 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -504,8 +504,12 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
- FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
- StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
+ FilenamePolicy policy =
+ DefaultFilenamePolicy.constructUsingStandardParameters(
+ StaticValueProvider.of(outputDirectory),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+ "",
+ false);
WriteFiles<Integer> write =
WriteFiles.to(
new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 5c4fea1..a88d95e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -129,15 +129,20 @@ public class WriteWithShardingFactoryTest {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
- FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
- StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
- WriteFiles<Object> original = WriteFiles.to(
- new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
- @Override
- public WriteOperation<Object> createWriteOperation() {
- throw new IllegalArgumentException("Should not be used");
- }
- });
+ FilenamePolicy policy =
+ DefaultFilenamePolicy.constructUsingStandardParameters(
+ StaticValueProvider.of(outputDirectory),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+ "",
+ false);
+ WriteFiles<Object> original =
+ WriteFiles.to(
+ new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
+ @Override
+ public WriteOperation<Object> createWriteOperation() {
+ throw new IllegalArgumentException("Should not be used");
+ }
+ });
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6af0e79..4143db2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -451,7 +451,7 @@ public class AvroIO {
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
+ getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
}
WriteFiles<T> write = WriteFiles.to(
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 5073854..f9e4ac4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);
/** The default sharding name template used in {@link #constructUsingStandardParameters}. */
- public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+ public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/** The default windowed sharding name template used when writing windowed files.
* This is used as default in cases when user did not specify shard template to
@@ -63,27 +64,12 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
* windowed and non-windowed file names.
*/
private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
- "P-W" + DEFAULT_SHARD_TEMPLATE;
-
- /*
- * pattern for only non-windowed file names
- */
- private static final String NON_WINDOWED_ONLY_PATTERN = "S+|N+";
-
- /*
- * pattern for only windowed file names
- */
- private static final String WINDOWED_ONLY_PATTERN = "P|W";
+ "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
/*
* pattern for both windowed and non-windowed file names
*/
- private static final String TEMPLATE_PATTERN = "(" + NON_WINDOWED_ONLY_PATTERN + "|"
- + WINDOWED_ONLY_PATTERN + ")";
-
- // Pattern that matches shard placeholders within a shard template.
- private static final Pattern SHARD_FORMAT_RE = Pattern.compile(TEMPLATE_PATTERN);
- private static final Pattern WINDOWED_FORMAT_RE = Pattern.compile(WINDOWED_ONLY_PATTERN);
+ private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");
/**
* Constructs a new {@link DefaultFilenamePolicy}.
@@ -104,19 +90,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
*
* <p>Any filename component of the provided resource will be used as the filename prefix.
*
- * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
- * will be used for non-windowed file names and {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will
- * be used for windowed file names.
+ * <p>If provided, the shard name template will be used; otherwise
+ * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and
+ * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names.
*
* <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
*/
public static DefaultFilenamePolicy constructUsingStandardParameters(
ValueProvider<ResourceId> outputPrefix,
@Nullable String shardTemplate,
- @Nullable String filenameSuffix) {
+ @Nullable String filenameSuffix,
+ boolean windowedWrites) {
+ // Pick the appropriate default policy based on whether windowed writes are being performed.
+ String defaultTemplate =
+ windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
return new DefaultFilenamePolicy(
NestedValueProvider.of(outputPrefix, new ExtractFilename()),
- firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE),
+ firstNonNull(shardTemplate, defaultTemplate),
firstNonNull(filenameSuffix, ""));
}
@@ -124,19 +114,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
private final String shardTemplate;
private final String suffix;
- /*
- * Checks whether given template contains enough information to form
- * meaningful windowed file names - ie whether it uses pane and window
- * info.
- */
- static boolean isWindowedTemplate(String template){
- if (template != null){
- Matcher m = WINDOWED_FORMAT_RE.matcher(template);
- return m.find();
- }
- return false;
- }
-
/**
* Constructs a fully qualified name from components.
*
@@ -191,51 +168,23 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
return sb.toString();
}
- static String constructName(String prefix, String shardTemplate, String suffix, int shardNum,
- int numShards) {
- return constructName(prefix, shardTemplate, suffix, shardNum, numShards, null, null);
- }
-
@Override
@Nullable
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
String extension) {
- String filename =
- constructName(
- prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards())
- + extension;
+ String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
+ context.getNumShards(), null, null) + extension;
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory,
WindowedContext context, String extension) {
-
- boolean shardTemplateProvidedByUser = !this.shardTemplate.equals(DEFAULT_SHARD_TEMPLATE);
-
- if (shardTemplateProvidedByUser){
- boolean isWindowed = isWindowedTemplate(this.shardTemplate);
- if (!isWindowed){
- LOG.info("Template you provided {} does not have enough information to create"
- + "meaningful windowed file names. Consider using P and W in your template",
- this.shardTemplate);
- }
- }
-
final PaneInfo paneInfo = context.getPaneInfo();
String paneStr = paneInfoToString(paneInfo);
String windowStr = windowToString(context.getWindow());
-
- String templateToUse = shardTemplate;
- if (!shardTemplateProvidedByUser){
- LOG.info("User did not provide shard template. For creating windowed file names "
- + "default template {} will be used", DEFAULT_WINDOWED_SHARD_TEMPLATE);
- templateToUse = DEFAULT_WINDOWED_SHARD_TEMPLATE;
- }
-
- String filename = constructName(prefix.get(), templateToUse, suffix,
- context.getShardNumber(), context.getNumShards(), paneStr, windowStr)
- + extension;
+ String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
+ context.getNumShards(), paneStr, windowStr) + extension;
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@@ -248,18 +197,20 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
}
if (window instanceof IntervalWindow) {
IntervalWindow iw = (IntervalWindow) window;
- return String.format("IntervalWindow-%s-%s", iw.start().toString(),
- iw.end().toString());
+ return String.format("%s-%s", iw.start().toString(), iw.end().toString());
}
return window.toString();
}
- private String paneInfoToString(PaneInfo paneInfo){
- long currentPaneIndex = (paneInfo == null ? -1L
- : paneInfo.getIndex());
- boolean firstPane = (paneInfo == null ? false : paneInfo.isFirst());
- boolean lastPane = (paneInfo == null ? false : paneInfo.isLast());
- return String.format("pane-%s-%b-%b", currentPaneIndex, firstPane, lastPane);
+ private String paneInfoToString(PaneInfo paneInfo) {
+ String paneString = String.format("pane-%d", paneInfo.getIndex());
+ if (paneInfo.getTiming() == Timing.LATE) {
+ paneString = String.format("%s-late", paneString);
+ }
+ if (paneInfo.isLast()) {
+ paneString = String.format("%s-last", paneString);
+ }
+ return paneString;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index c274595..e288075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -559,7 +559,7 @@ public class TFRecordIO {
super(
outputPrefix,
DefaultFilenamePolicy.constructUsingStandardParameters(
- outputPrefix, shardTemplate, suffix),
+ outputPrefix, shardTemplate, suffix, false),
writableByteChannelFactory(compressionType));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index afb5849..f1eb7c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -440,7 +440,7 @@ public class TextIO {
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
+ getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
}
WriteFiles<String> write =
WriteFiles.to(
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index d71f2f7..6d01d32 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -479,7 +479,8 @@ public class AvroIOTest {
p.run();
String shardNameTemplate =
- firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
+ firstNonNull(write.getShardTemplate(),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
@@ -493,7 +494,13 @@ public class AvroIOTest {
expectedFiles.add(
new File(
DefaultFilenamePolicy.constructName(
- outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
+ outputFilePrefix,
+ shardNameTemplate,
+ "" /* no suffix */,
+ i,
+ numShards,
+ null,
+ null)));
}
List<String> actualElements = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index 787403b..217420c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -18,10 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
-import static org.apache.beam.sdk.io.DefaultFilenamePolicy.isWindowedTemplate;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,36 +33,25 @@ public class DefaultFilenamePolicyTest {
@Test
public void testConstructName() {
assertEquals("output-001-of-123.txt",
- constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+ constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
assertEquals("out.txt/part-00042",
- constructName("out.txt", "/part-SSSSS", "", 42, 100));
+ constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));
assertEquals("out.txt",
- constructName("ou", "t.t", "xt", 1, 1));
+ constructName("ou", "t.t", "xt", 1, 1, null, null));
assertEquals("out0102shard.txt",
- constructName("out", "SSNNshard", ".txt", 1, 2));
+ constructName("out", "SSNNshard", ".txt", 1, 2, null, null));
assertEquals("out-2/1.part-1-of-2.txt",
- constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+ constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
}
@Test
public void testConstructNameWithLargeShardCount() {
assertEquals("out-100-of-5000.txt",
- constructName("out", "-SS-of-NN", ".txt", 100, 5000));
- }
-
- @Test
- public void testIsWindowedTemplate(){
- assertTrue(isWindowedTemplate("-SSS-of-NNN-P-W"));
- assertTrue(isWindowedTemplate("-SSS-of-NNN-W"));
- assertTrue(isWindowedTemplate("-SSS-of-NNN-P"));
- assertTrue(isWindowedTemplate("W-SSS-of-NNN"));
-
- assertFalse(isWindowedTemplate("-SSS-of-NNN"));
- assertFalse(isWindowedTemplate("-SSS-of-lp"));
+ constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 6c7a53f..9468893 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -312,7 +312,8 @@ public class TextIOTest {
p.run();
assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
- firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
+ firstNonNull(write.getShardTemplate(),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE));
}
public static void assertOutputFiles(
@@ -337,7 +338,7 @@ public class TextIOTest {
new File(
rootLocation.toString(),
DefaultFilenamePolicy.constructName(
- outputName, shardNameTemplate, "", i, numShards)));
+ outputName, shardNameTemplate, "", i, numShards, null, null)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e764167f/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 60075a7..6ae83f2 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -41,7 +41,7 @@ class XmlSink<T> extends FileBasedSink<T> {
private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
return DefaultFilenamePolicy.constructUsingStandardParameters(
- spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
+ spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false);
}
XmlSink(XmlIO.Write<T> spec) {