You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:58 UTC
[beam] 11/13: Fixes tests
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 060f05c659920c3a48dbc67c38db770788802d06
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Mon Nov 27 11:41:25 2017 -0800
Fixes tests
---
.../core/construction/WriteFilesTranslation.java | 2 +-
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 32 ++++++++---------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 17 +++++++--
.../org/apache/beam/sdk/io/FileBasedSinkTest.java | 42 ++++++++++++----------
4 files changed, 55 insertions(+), 38 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index a6dd55c..90f6453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -303,7 +303,7 @@ public class WriteFilesTranslation {
public Map<Class<? extends PTransform>, TransformPayloadTranslator>
getTransformPayloadTranslators() {
return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
- WriteFiles.class, new WriteFilesTranslator());
+ WriteFiles.CONCRETE_CLASS, new WriteFilesTranslator());
}
@Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 12c4555..48d7521 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -699,7 +699,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
// if set.
Set<Integer> missingShardNums;
if (numShards == null) {
- missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+ missingShardNums =
+ existingResults.isEmpty()
+ ? ImmutableSet.of(UNKNOWN_SHARDNUM)
+ : ImmutableSet.<Integer>of();
} else {
missingShardNums = Sets.newHashSet();
for (int i = 0; i < numShards; ++i) {
@@ -726,8 +729,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
String uuid = UUID.randomUUID().toString();
LOG.info("Opening empty writer {} for destination {}", uuid, dest);
Writer<DestinationT, ?> writer = createWriter();
+ writer.setDestination(dest);
// Currently this code path is only called in the unwindowed case.
- writer.open(uuid, dest);
+ writer.open(uuid);
writer.close();
completeResults.add(
new FileResult<>(
@@ -760,8 +764,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
int numFiles = resultsToFinalFilenames.size();
LOG.debug("Copying {} files.", numFiles);
- List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
- List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+ List<ResourceId> srcFiles = new ArrayList<>();
+ List<ResourceId> dstFiles = new ArrayList<>();
for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
srcFiles.add(entry.getKey().getTempFilename());
dstFiles.add(entry.getValue());
@@ -923,22 +927,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
protected void finishWrite() throws Exception {}
/**
- * Performs bundle initialization. For example, creates a temporary file for writing or
- * initializes any state that will be used across calls to {@link Writer#write}.
+ * Opens a uniquely named temporary file and initializes the writer using {@link #prepareWrite}.
*
* <p>The unique id that is given to open should be used to ensure that the writer's output does
* not interfere with the output of other Writers, as a bundle may be executed many times for
* fault tolerance.
- *
- * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard
- * id populated for the case of static sharding. In cases where the runner is dynamically
- * picking sharding, shard might be set to -1.
*/
- public final void open(
- String uId, DestinationT destination)
- throws Exception {
+ public final void open(String uId) throws Exception {
this.id = uId;
- this.destination = destination;
ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
verifyNotNull(
@@ -1040,6 +1036,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
return writeOperation;
}
+ void setDestination(DestinationT destination) {
+ this.destination = destination;
+ }
+
/** Return the user destination object for this writer. */
public DestinationT getDestination() {
return destination;
@@ -1064,8 +1064,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
BoundedWindow window,
PaneInfo paneInfo,
DestinationT destination) {
- checkArgument(window != null);
- checkArgument(paneInfo != null);
+ checkArgument(window != null, "window can not be null");
+ checkArgument(paneInfo != null, "paneInfo can not be null");
this.tempFilename = tempFilename;
this.shard = shard;
this.window = window;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 12f5cce..54f055d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -114,6 +115,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
+ /** For internal use by runners. */
+ @Internal
+ public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class;
+
// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing per-window files.
// The first 20 files from a single WriteFiles transform will write files inline in the
@@ -497,7 +502,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
paneInfo,
destination);
writer = writeOperation.createWriter();
- writer.open(uuid, destination);
+ writer.setDestination(destination);
+ writer.open(uuid);
writers.put(key, writer);
LOG.debug("Done opening writer");
} else {
@@ -623,7 +629,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
"ApplyShardingKey",
ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
.withSideInputs(
- numShardsView == null
+ (numShardsView == null)
? ImmutableList.<PCollectionView<Integer>>of()
: ImmutableList.of(numShardsView)))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
@@ -706,7 +712,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
c.pane(),
destination);
writer = writeOperation.createWriter();
- writer.open(uuid, destination);
+ writer.setDestination(destination);
+ writer.open(uuid);
writers.put(destination, writer);
}
writeOrClose(writer, getDynamicDestinations().formatRecord(input));
@@ -724,6 +731,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
throw e;
}
int shard = c.element().getKey().getShardNumber();
+ checkArgument(
+ shard != UNKNOWN_SHARDNUM,
+ "Shard should have been set, but is unset for element %s",
+ c.element());
c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 561d036..0c9bdc1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -52,6 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
@@ -100,7 +102,7 @@ public class FileBasedSinkTest {
SimpleSink.SimpleWriter<Void> writer =
buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
- writer.open(testUid, null);
+ writer.open(testUid);
for (String value : values) {
writer.write(value);
}
@@ -196,14 +198,14 @@ public class FileBasedSinkTest {
new FileResult<Void>(
LocalResources.fromFile(temporaryFiles.get(i), false),
UNKNOWN_SHARDNUM,
- null,
- null,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
null));
}
// TODO: test with null first argument?
List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
- writeOp.finalizeDestination(null, null, null, fileResults);
+ writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, null, fileResults);
writeOp.moveToOutputFiles(resultsToFinalFilenames);
for (int i = 0; i < numFiles; i++) {
@@ -213,7 +215,7 @@ public class FileBasedSinkTest {
.getDynamicDestinations()
.getFilenamePolicy(null)
.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED);
- assertTrue(new File(outputFilename.toString()).exists());
+ assertTrue(outputFilename.toString(), new File(outputFilename.toString()).exists());
assertFalse(temporaryFiles.get(i).exists());
}
@@ -292,7 +294,11 @@ public class FileBasedSinkTest {
resultsToFinalFilenames.add(
KV.of(
new FileResult<Void>(
- LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null),
+ LocalResources.fromFile(inputTmpFile, false),
+ UNKNOWN_SHARDNUM,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ null),
finalFilename));
}
@@ -354,22 +360,22 @@ public class FileBasedSinkTest {
SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
- ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
- ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
- ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
- // More than one shard does.
try {
- List<FileResult<Void>> results =
- Lists.newArrayList(
- new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
- new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
- new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
- writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
+ List<FileResult<Void>> results = Lists.newArrayList();
+ for (int i = 0; i < 3; ++i) {
+ results.add(new FileResult<Void>(
+ root.resolve("temp" + i, StandardResolveOptions.RESOLVE_FILE),
+ 1 /* shard - should be different, but is the same */,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ null));
+ }
+ writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, 5 /* numShards */, results);
fail("Should have failed.");
} catch (IllegalArgumentException exn) {
assertThat(exn.getMessage(), containsString("generated the same name"));
+ assertThat(exn.getMessage(), containsString("temp0"));
assertThat(exn.getMessage(), containsString("temp1"));
- assertThat(exn.getMessage(), containsString("temp2"));
}
}
@@ -505,7 +511,7 @@ public class FileBasedSinkTest {
expected.add("footer");
expected.add("footer");
- writer.open(testUid, null);
+ writer.open(testUid);
writer.write("a");
writer.write("b");
writer.close();
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.