You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:58 UTC

[beam] 11/13: Fixes tests

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 060f05c659920c3a48dbc67c38db770788802d06
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Mon Nov 27 11:41:25 2017 -0800

    Fixes tests
---
 .../core/construction/WriteFilesTranslation.java   |  2 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 32 ++++++++---------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 17 +++++++--
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 42 ++++++++++++----------
 4 files changed, 55 insertions(+), 38 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index a6dd55c..90f6453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -303,7 +303,7 @@ public class WriteFilesTranslation {
     public Map<Class<? extends PTransform>, TransformPayloadTranslator>
         getTransformPayloadTranslators() {
       return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
-          WriteFiles.class, new WriteFilesTranslator());
+          WriteFiles.CONCRETE_CLASS, new WriteFilesTranslator());
     }
 
     @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 12c4555..48d7521 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -699,7 +699,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       // if set.
       Set<Integer> missingShardNums;
       if (numShards == null) {
-        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+        missingShardNums =
+            existingResults.isEmpty()
+                ? ImmutableSet.of(UNKNOWN_SHARDNUM)
+                : ImmutableSet.<Integer>of();
       } else {
         missingShardNums = Sets.newHashSet();
         for (int i = 0; i < numShards; ++i) {
@@ -726,8 +729,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
           String uuid = UUID.randomUUID().toString();
           LOG.info("Opening empty writer {} for destination {}", uuid, dest);
           Writer<DestinationT, ?> writer = createWriter();
+          writer.setDestination(dest);
           // Currently this code path is only called in the unwindowed case.
-          writer.open(uuid, dest);
+          writer.open(uuid);
           writer.close();
           completeResults.add(
               new FileResult<>(
@@ -760,8 +764,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
       int numFiles = resultsToFinalFilenames.size();
       LOG.debug("Copying {} files.", numFiles);
-      List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
-      List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      List<ResourceId> srcFiles = new ArrayList<>();
+      List<ResourceId> dstFiles = new ArrayList<>();
       for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         srcFiles.add(entry.getKey().getTempFilename());
         dstFiles.add(entry.getValue());
@@ -923,22 +927,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     protected void finishWrite() throws Exception {}
 
     /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
+     * Opens a uniquely named temporary file and initializes the writer using {@link #prepareWrite}.
      *
      * <p>The unique id that is given to open should be used to ensure that the writer's output does
      * not interfere with the output of other Writers, as a bundle may be executed many times for
      * fault tolerance.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard
-     * id populated for the case of static sharding. In cases where the runner is dynamically
-     * picking sharding, shard might be set to -1.
      */
-    public final void open(
-        String uId, DestinationT destination)
-        throws Exception {
+    public final void open(String uId) throws Exception {
       this.id = uId;
-      this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
       verifyNotNull(
@@ -1040,6 +1036,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       return writeOperation;
     }
 
+    void setDestination(DestinationT destination) {
+      this.destination = destination;
+    }
+
     /** Return the user destination object for this writer. */
     public DestinationT getDestination() {
       return destination;
@@ -1064,8 +1064,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         BoundedWindow window,
         PaneInfo paneInfo,
         DestinationT destination) {
-      checkArgument(window != null);
-      checkArgument(paneInfo != null);
+      checkArgument(window != null, "window can not be null");
+      checkArgument(paneInfo != null, "paneInfo can not be null");
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 12f5cce..54f055d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -114,6 +115,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
+  /** For internal use by runners. */
+  @Internal
+  public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class;
+
   // The maximum number of file writers to keep open in a single bundle at a time, since file
   // writers default to 64mb buffers. This comes into play when writing per-window files.
   // The first 20 files from a single WriteFiles transform will write files inline in the
@@ -497,7 +502,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               paneInfo,
               destination);
           writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
+          writer.setDestination(destination);
+          writer.open(uuid);
           writers.put(key, writer);
           LOG.debug("Done opening writer");
         } else {
@@ -623,7 +629,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               "ApplyShardingKey",
               ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
                   .withSideInputs(
-                      numShardsView == null
+                      (numShardsView == null)
                           ? ImmutableList.<PCollectionView<Integer>>of()
                           : ImmutableList.of(numShardsView)))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
@@ -706,7 +712,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               c.pane(),
               destination);
           writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
+          writer.setDestination(destination);
+          writer.open(uuid);
           writers.put(destination, writer);
         }
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
@@ -724,6 +731,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
           throw e;
         }
         int shard = c.element().getKey().getShardNumber();
+        checkArgument(
+            shard != UNKNOWN_SHARDNUM,
+            "Shard should have been set, but is unset for element %s",
+            c.element());
         c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
       }
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 561d036..0c9bdc1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -52,6 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
@@ -100,7 +102,7 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter<Void> writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.open(testUid, null);
+    writer.open(testUid);
     for (String value : values) {
       writer.write(value);
     }
@@ -196,14 +198,14 @@ public class FileBasedSinkTest {
           new FileResult<Void>(
               LocalResources.fromFile(temporaryFiles.get(i), false),
               UNKNOWN_SHARDNUM,
-              null,
-              null,
+              GlobalWindow.INSTANCE,
+              PaneInfo.ON_TIME_AND_ONLY_FIRING,
               null));
     }
 
     // TODO: test with null first argument?
     List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
-        writeOp.finalizeDestination(null, null, null, fileResults);
+        writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, null, fileResults);
     writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     for (int i = 0; i < numFiles; i++) {
@@ -213,7 +215,7 @@ public class FileBasedSinkTest {
               .getDynamicDestinations()
               .getFilenamePolicy(null)
               .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED);
-      assertTrue(new File(outputFilename.toString()).exists());
+      assertTrue(outputFilename.toString(), new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
@@ -292,7 +294,11 @@ public class FileBasedSinkTest {
       resultsToFinalFilenames.add(
           KV.of(
               new FileResult<Void>(
-                  LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null),
+                  LocalResources.fromFile(inputTmpFile, false),
+                  UNKNOWN_SHARDNUM,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  null),
               finalFilename));
     }
 
@@ -354,22 +360,22 @@ public class FileBasedSinkTest {
         SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
     SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
 
-    ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
-    ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
-    ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
-    // More than one shard does.
     try {
-      List<FileResult<Void>> results =
-          Lists.newArrayList(
-              new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
-              new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
-              new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
-      writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
+      List<FileResult<Void>> results = Lists.newArrayList();
+      for (int i = 0; i < 3; ++i) {
+        results.add(new FileResult<Void>(
+            root.resolve("temp" + i, StandardResolveOptions.RESOLVE_FILE),
+            1 /* shard - should be different, but is the same */,
+            GlobalWindow.INSTANCE,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING,
+            null));
+      }
+      writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, 5 /* numShards */, results);
       fail("Should have failed.");
     } catch (IllegalArgumentException exn) {
       assertThat(exn.getMessage(), containsString("generated the same name"));
+      assertThat(exn.getMessage(), containsString("temp0"));
       assertThat(exn.getMessage(), containsString("temp1"));
-      assertThat(exn.getMessage(), containsString("temp2"));
     }
   }
 
@@ -505,7 +511,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.open(testUid, null);
+    writer.open(testUid);
     writer.write("a");
     writer.write("b");
     writer.close();

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.