You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2021/04/19 09:55:48 UTC

[beam] branch master updated: [BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560)

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a89879f  [BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560)
a89879f is described below

commit a89879f81e5d2e7a365ad030fc909dbd50abe68e
Author: brucearctor <50...@users.noreply.github.com>
AuthorDate: Mon Apr 19 02:54:28 2021 -0700

    [BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560)
    
    * [BEAM-8611] Move TextSourceTest into TextIOReadTest
    
    * fixed checks and imports
---
 .../org/apache/beam/sdk/io/TextIOReadTest.java     | 122 ++++++++++++++++
 .../org/apache/beam/sdk/io/TextSourceTest.java     | 158 ---------------------
 2 files changed, 122 insertions(+), 158 deletions(-)

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index f51938b..b113dac 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -39,12 +39,14 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -66,6 +68,9 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -851,4 +856,121 @@ public class TextIOReadTest {
       p.run();
     }
   }
+
+  /** Tests for TextSource class. */
+  @RunWith(JUnit4.class)
+  public static class TextSourceTest {
+    @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testRemoveUtf8BOM() throws Exception {
+      Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
+      Path p2 =
+          createTestFile(
+              "test_txt_utf8_no_bom",
+              Charset.forName("UTF-8"),
+              "1,p2-Japanese:テスト",
+              "2,p2-Japanese:テスト");
+      Path p3 =
+          createTestFile(
+              "test_txt_utf8_bom",
+              Charset.forName("UTF-8"),
+              "\uFEFF1,p3-テストBOM",
+              "\uFEFF2,p3-テストBOM");
+      PCollection<String> contents =
+          pipeline
+              .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
+              .setCoder(StringUtf8Coder.of())
+              // PCollection<String>
+              .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+      // PCollection<KV<String, String>>: tableName, line
+
+      // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
+      PAssert.that(contents)
+          .containsInAnyOrder(
+              "1,p1",
+              "2,p1",
+              "1,p2-Japanese:テスト",
+              "2,p2-Japanese:テスト",
+              "1,p3-テストBOM",
+              "\uFEFF2,p3-テストBOM");
+
+      pipeline.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testPreserveNonBOMBytes() throws Exception {
+      // Contains \uFEFE, not UTF BOM.
+      Path p1 =
+          createTestFile(
+              "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+      PCollection<String> contents =
+          pipeline
+              .apply("Create", Create.of(p1.toString()))
+              .setCoder(StringUtf8Coder.of())
+              // PCollection<String>
+              .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+
+      PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+
+      pipeline.run();
+    }
+
+    private static class FileReadDoFn extends DoFn<FileIO.ReadableFile, String> {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        FileIO.ReadableFile file = c.element();
+        ValueProvider<String> filenameProvider =
+            ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
+        // Create a TextSource, passing null as the delimiter to use the default
+        // delimiters ('\n', '\r', or '\r\n').
+        TextSource textSource = new TextSource(filenameProvider, null, null);
+        try {
+          BoundedSource.BoundedReader<String> reader =
+              textSource
+                  .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
+                  .createReader(c.getPipelineOptions());
+          for (boolean more = reader.start(); more; more = reader.advance()) {
+            c.output(reader.getCurrent());
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "Unable to readFile: " + file.getMetadata().resourceId().toString());
+        }
+      }
+    }
+
+    /** A transform that reads CSV file records. */
+    private static class TextFileReadTransform
+        extends PTransform<PCollection<String>, PCollection<String>> {
+      public TextFileReadTransform() {}
+
+      @Override
+      public PCollection<String> expand(PCollection<String> files) {
+        return files
+            // PCollection<String>
+            .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
+            // PCollection<Match.Metadata>
+            .apply(FileIO.readMatches())
+            // PCollection<FileIO.ReadableFile>
+            .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn()));
+        // PCollection<String>: line
+      }
+    }
+
+    private Path createTestFile(String filename, Charset charset, String... lines)
+        throws IOException {
+      Path path = Files.createTempFile(filename, ".csv");
+      try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
+        for (String line : lines) {
+          writer.write(line);
+          writer.write('\n');
+        }
+      }
+      return path;
+    }
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
deleted file mode 100644
index 36a3f68..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for TextSource class. */
-@RunWith(JUnit4.class)
-public class TextSourceTest {
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testRemoveUtf8BOM() throws Exception {
-    Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
-    Path p2 =
-        createTestFile(
-            "test_txt_utf8_no_bom",
-            Charset.forName("UTF-8"),
-            "1,p2-Japanese:テスト",
-            "2,p2-Japanese:テスト");
-    Path p3 =
-        createTestFile(
-            "test_txt_utf8_bom",
-            Charset.forName("UTF-8"),
-            "\uFEFF1,p3-テストBOM",
-            "\uFEFF2,p3-テストBOM");
-    PCollection<String> contents =
-        pipeline
-            .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
-            .setCoder(StringUtf8Coder.of())
-            // PCollection<String>
-            .apply("Read file", new TextFileReadTransform());
-    // PCollection<KV<String, String>>: tableName, line
-
-    // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
-    PAssert.that(contents)
-        .containsInAnyOrder(
-            "1,p1",
-            "2,p1",
-            "1,p2-Japanese:テスト",
-            "2,p2-Japanese:テスト",
-            "1,p3-テストBOM",
-            "\uFEFF2,p3-テストBOM");
-
-    pipeline.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testPreserveNonBOMBytes() throws Exception {
-    // Contains \uFEFE, not UTF BOM.
-    Path p1 =
-        createTestFile(
-            "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-    PCollection<String> contents =
-        pipeline
-            .apply("Create", Create.of(p1.toString()))
-            .setCoder(StringUtf8Coder.of())
-            // PCollection<String>
-            .apply("Read file", new TextFileReadTransform());
-
-    PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-
-    pipeline.run();
-  }
-
-  private static class FileReadDoFn extends DoFn<ReadableFile, String> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      ReadableFile file = c.element();
-      ValueProvider<String> filenameProvider =
-          ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
-      // Create a TextSource, passing null as the delimiter to use the default
-      // delimiters ('\n', '\r', or '\r\n').
-      TextSource textSource = new TextSource(filenameProvider, null, null);
-      try {
-        BoundedSource.BoundedReader<String> reader =
-            textSource
-                .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
-                .createReader(c.getPipelineOptions());
-        for (boolean more = reader.start(); more; more = reader.advance()) {
-          c.output(reader.getCurrent());
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Unable to readFile: " + file.getMetadata().resourceId().toString());
-      }
-    }
-  }
-
-  /** A transform that reads CSV file records. */
-  private static class TextFileReadTransform
-      extends PTransform<PCollection<String>, PCollection<String>> {
-    public TextFileReadTransform() {}
-
-    @Override
-    public PCollection<String> expand(PCollection<String> files) {
-      return files
-          // PCollection<String>
-          .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
-          // PCollection<Match.Metadata>
-          .apply(FileIO.readMatches())
-          // PCollection<FileIO.ReadableFile>
-          .apply("Read lines", ParDo.of(new FileReadDoFn()));
-      // PCollection<String>: line
-    }
-  }
-
-  private Path createTestFile(String filename, Charset charset, String... lines)
-      throws IOException {
-    Path path = Files.createTempFile(filename, ".csv");
-    try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
-      for (String line : lines) {
-        writer.write(line);
-        writer.write('\n');
-      }
-    }
-    return path;
-  }
-}