You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/10 19:20:26 UTC
[2/5] beam git commit: Simpler code for setting shard numbers on
results in FileBasedSink
Simpler code for setting shard numbers on results in FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a09a3ec0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a09a3ec0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a09a3ec0
Branch: refs/heads/master
Commit: a09a3ec0645689f54a4fffc944cc531192e1f014
Parents: 2d5fbf6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 9 15:10:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed May 10 12:18:41 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 113 +++++++++++--------
1 file changed, 68 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a09a3ec0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2117794..7f729a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -22,10 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verifyNotNull;
+import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import java.io.IOException;
import java.io.InputStream;
@@ -530,44 +532,63 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
int numShards = Iterables.size(writerResults);
Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
- List<FileResult> unshardedFiles = new ArrayList<>();
FilenamePolicy policy = getSink().getFilenamePolicy();
+ ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
+
+ // Either all results have a shard number set (if the sink is configured with a fixed
+ // number of shards), or they all don't (otherwise).
+ Boolean isShardNumberSetEverywhere = null;
for (FileResult result : writerResults) {
- if (result.getShard() != WriteFiles.UNKNOWN_SHARDNUM) {
- outputFilenames.put(result.getTempFilename(),
- result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
- numShards, getSink().getExtension()));
+ boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
+ if (isShardNumberSetEverywhere == null) {
+ isShardNumberSetEverywhere = isShardNumberSetHere;
} else {
- unshardedFiles.add(result);
+ checkArgument(
+ isShardNumberSetEverywhere == isShardNumberSetHere,
+ "Found a mix of files with and without shard number set: %s",
+ result);
}
}
- // writerResults won't contain destination filenames, so we dynamically generate them here.
- if (unshardedFiles.size() > 0) {
- checkArgument(outputFilenames.isEmpty());
+ if (isShardNumberSetEverywhere == null) {
+ isShardNumberSetEverywhere = true;
+ }
+ List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
+ if (isShardNumberSetEverywhere) {
+ resultsWithShardNumbers = Lists.newArrayList(writerResults);
+ } else {
// Sort files for idempotence. Sort by temporary filename.
// Note that this codepath should not be used when processing triggered windows. In the
// case of triggers, the list of FileResult objects in the Finalize iterable is not
// deterministic, and might change over retries. This breaks the assumption below that
// sorting the FileResult objects provides idempotency.
- unshardedFiles = Ordering.from(new Comparator<FileResult>() {
- @Override
- public int compare(FileResult first, FileResult second) {
- return first.getTempFilename().toString().compareTo(
- second.getTempFilename().toString());
- }
- }).sortedCopy(unshardedFiles);
- for (int i = 0; i < unshardedFiles.size(); i++) {
- FileResult result = unshardedFiles.get(i);
- result.setShard(i);
- outputFilenames.put(result.getTempFilename(),
- result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
- numShards, getSink().getExtension()));
+ List<FileResult> sortedByTempFilename =
+ Ordering.from(
+ new Comparator<FileResult>() {
+ @Override
+ public int compare(FileResult first, FileResult second) {
+ String firstFilename = first.getTempFilename().toString();
+ String secondFilename = second.getTempFilename().toString();
+ return firstFilename.compareTo(secondFilename);
+ }
+ })
+ .sortedCopy(writerResults);
+ for (int i = 0; i < sortedByTempFilename.size(); i++) {
+ resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
}
}
- int numDistinctShards = new HashSet<ResourceId>(outputFilenames.values()).size();
+ for (FileResult result : resultsWithShardNumbers) {
+ checkArgument(
+ result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
+ outputFilenames.put(
+ result.getTempFilename(),
+ result.getDestinationFile(
+ policy, baseOutputDir, numShards, getSink().getExtension()));
+ }
+
+ int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
checkState(numDistinctShards == outputFilenames.size(),
"Only generated %s distinct file names for %s files.",
numDistinctShards, outputFilenames.size());
@@ -920,12 +941,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
*/
public static final class FileResult {
private final ResourceId tempFilename;
- private int shard;
- private BoundedWindow window;
- private PaneInfo paneInfo;
+ private final int shard;
+ private final BoundedWindow window;
+ private final PaneInfo paneInfo;
- public FileResult(ResourceId tempFilename, int shard, BoundedWindow window,
- PaneInfo paneInfo) {
+ public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
this.tempFilename = tempFilename;
this.shard = shard;
this.window = window;
@@ -940,8 +960,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
return shard;
}
- public void setShard(int shard) {
- this.shard = shard;
+ public FileResult withShard(int shard) {
+ return new FileResult(tempFilename, shard, window, paneInfo);
}
public BoundedWindow getWindow() {
@@ -954,7 +974,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
int numShards, String extension) {
- checkArgument(getShard() != WriteFiles.UNKNOWN_SHARDNUM);
+ checkArgument(getShard() != UNKNOWN_SHARDNUM);
checkArgument(numShards > 0);
if (getWindow() != null) {
return policy.windowedFilename(outputDirectory, new WindowedContext(
@@ -966,8 +986,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
}
public String toString() {
- return MoreObjects.toStringHelper(FileBasedSink.FileResult.class)
+ return MoreObjects.toStringHelper(FileResult.class)
.add("tempFilename", tempFilename)
+ .add("shard", shard)
+ .add("window", window)
+ .add("paneInfo", paneInfo)
.toString();
}
}
@@ -976,9 +999,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* A coder for {@link FileResult} objects.
*/
public static final class FileResultCoder extends StructuredCoder<FileResult> {
- private final Coder<String> stringCoder = StringUtf8Coder.of();
- private final Coder<Integer> integerCoder = VarIntCoder.of();
- private final Coder<PaneInfo> paneInfoCoder = NullableCoder.of(PaneInfoCoder.INSTANCE);
+ private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
+ private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
+ private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
+
private final Coder<BoundedWindow> windowCoder;
protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
@@ -1000,30 +1024,29 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
if (value == null) {
throw new CoderException("cannot encode a null value");
}
- stringCoder.encode(value.getTempFilename().toString(), outStream);
+ FILENAME_CODER.encode(value.getTempFilename().toString(), outStream);
windowCoder.encode(value.getWindow(), outStream);
- paneInfoCoder.encode(value.getPaneInfo(), outStream);
- integerCoder.encode(value.getShard(), outStream);
+ PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
+ SHARD_CODER.encode(value.getShard(), outStream);
}
@Override
public FileResult decode(InputStream inStream)
throws IOException {
- String tempFilename = stringCoder.decode(inStream);
- assert tempFilename != null; // fixes a compiler warning
+ String tempFilename = FILENAME_CODER.decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
- PaneInfo paneInfo = paneInfoCoder.decode(inStream);
- int shard = integerCoder.decode(inStream);
+ PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
+ int shard = SHARD_CODER.decode(inStream);
return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
shard, window, paneInfo);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
- stringCoder.verifyDeterministic();
+ FILENAME_CODER.verifyDeterministic();
windowCoder.verifyDeterministic();
- paneInfoCoder.verifyDeterministic();
- integerCoder.verifyDeterministic();
+ PANE_INFO_CODER.verifyDeterministic();
+ SHARD_CODER.verifyDeterministic();
}
}