You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2021/04/19 09:55:48 UTC
[beam] branch master updated: [BEAM-8611] Move TextSourceTest into
TextIOReadTest (#14560)
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a89879f [BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560)
a89879f is described below
commit a89879f81e5d2e7a365ad030fc909dbd50abe68e
Author: brucearctor <50...@users.noreply.github.com>
AuthorDate: Mon Apr 19 02:54:28 2021 -0700
[BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560)
* [BEAM-8611] Move TextSourceTest into TextIOReadTest
* fixed checks and imports
---
.../org/apache/beam/sdk/io/TextIOReadTest.java | 122 ++++++++++++++++
.../org/apache/beam/sdk/io/TextSourceTest.java | 158 ---------------------
2 files changed, 122 insertions(+), 158 deletions(-)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index f51938b..b113dac 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -39,12 +39,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Writer;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -66,6 +68,9 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -851,4 +856,121 @@ public class TextIOReadTest {
p.run();
}
}
+
+ /** Tests for TextSource class. */
+ @RunWith(JUnit4.class)
+ public static class TextSourceTest {
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testRemoveUtf8BOM() throws Exception {
+ Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
+ Path p2 =
+ createTestFile(
+ "test_txt_utf8_no_bom",
+ Charset.forName("UTF-8"),
+ "1,p2-Japanese:テスト",
+ "2,p2-Japanese:テスト");
+ Path p3 =
+ createTestFile(
+ "test_txt_utf8_bom",
+ Charset.forName("UTF-8"),
+ "\uFEFF1,p3-テストBOM",
+ "\uFEFF2,p3-テストBOM");
+ PCollection<String> contents =
+ pipeline
+ .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
+ .setCoder(StringUtf8Coder.of())
+ // PCollection<String>
+ .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+ // PCollection<KV<String, String>>: tableName, line
+
+ // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
+ PAssert.that(contents)
+ .containsInAnyOrder(
+ "1,p1",
+ "2,p1",
+ "1,p2-Japanese:テスト",
+ "2,p2-Japanese:テスト",
+ "1,p3-テストBOM",
+ "\uFEFF2,p3-テストBOM");
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testPreserveNonBOMBytes() throws Exception {
+ // Contains \uFEFE, not UTF BOM.
+ Path p1 =
+ createTestFile(
+ "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+ PCollection<String> contents =
+ pipeline
+ .apply("Create", Create.of(p1.toString()))
+ .setCoder(StringUtf8Coder.of())
+ // PCollection<String>
+ .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform());
+
+ PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
+
+ pipeline.run();
+ }
+
+ private static class FileReadDoFn extends DoFn<FileIO.ReadableFile, String> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ FileIO.ReadableFile file = c.element();
+ ValueProvider<String> filenameProvider =
+ ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
+ // Create a TextSource, passing null as the delimiter to use the default
+ // delimiters ('\n', '\r', or '\r\n').
+ TextSource textSource = new TextSource(filenameProvider, null, null);
+ try {
+ BoundedSource.BoundedReader<String> reader =
+ textSource
+ .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
+ .createReader(c.getPipelineOptions());
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ c.output(reader.getCurrent());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to readFile: " + file.getMetadata().resourceId().toString());
+ }
+ }
+ }
+
+ /** A transform that reads CSV file records. */
+ private static class TextFileReadTransform
+ extends PTransform<PCollection<String>, PCollection<String>> {
+ public TextFileReadTransform() {}
+
+ @Override
+ public PCollection<String> expand(PCollection<String> files) {
+ return files
+ // PCollection<String>
+ .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
+ // PCollection<Match.Metadata>
+ .apply(FileIO.readMatches())
+ // PCollection<FileIO.ReadableFile>
+ .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn()));
+ // PCollection<String>: line
+ }
+ }
+
+ private Path createTestFile(String filename, Charset charset, String... lines)
+ throws IOException {
+ Path path = Files.createTempFile(filename, ".csv");
+ try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
+ for (String line : lines) {
+ writer.write(line);
+ writer.write('\n');
+ }
+ }
+ return path;
+ }
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
deleted file mode 100644
index 36a3f68..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for TextSource class. */
-@RunWith(JUnit4.class)
-public class TextSourceTest {
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- @Test
- @Category(NeedsRunner.class)
- public void testRemoveUtf8BOM() throws Exception {
- Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1");
- Path p2 =
- createTestFile(
- "test_txt_utf8_no_bom",
- Charset.forName("UTF-8"),
- "1,p2-Japanese:テスト",
- "2,p2-Japanese:テスト");
- Path p3 =
- createTestFile(
- "test_txt_utf8_bom",
- Charset.forName("UTF-8"),
- "\uFEFF1,p3-テストBOM",
- "\uFEFF2,p3-テストBOM");
- PCollection<String> contents =
- pipeline
- .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString()))
- .setCoder(StringUtf8Coder.of())
- // PCollection<String>
- .apply("Read file", new TextFileReadTransform());
- // PCollection<KV<String, String>>: tableName, line
-
- // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed.
- PAssert.that(contents)
- .containsInAnyOrder(
- "1,p1",
- "2,p1",
- "1,p2-Japanese:テスト",
- "2,p2-Japanese:テスト",
- "1,p3-テストBOM",
- "\uFEFF2,p3-テストBOM");
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testPreserveNonBOMBytes() throws Exception {
- // Contains \uFEFE, not UTF BOM.
- Path p1 =
- createTestFile(
- "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
- PCollection<String> contents =
- pipeline
- .apply("Create", Create.of(p1.toString()))
- .setCoder(StringUtf8Coder.of())
- // PCollection<String>
- .apply("Read file", new TextFileReadTransform());
-
- PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト");
-
- pipeline.run();
- }
-
- private static class FileReadDoFn extends DoFn<ReadableFile, String> {
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- ReadableFile file = c.element();
- ValueProvider<String> filenameProvider =
- ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename());
- // Create a TextSource, passing null as the delimiter to use the default
- // delimiters ('\n', '\r', or '\r\n').
- TextSource textSource = new TextSource(filenameProvider, null, null);
- try {
- BoundedSource.BoundedReader<String> reader =
- textSource
- .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes())
- .createReader(c.getPipelineOptions());
- for (boolean more = reader.start(); more; more = reader.advance()) {
- c.output(reader.getCurrent());
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Unable to readFile: " + file.getMetadata().resourceId().toString());
- }
- }
- }
-
- /** A transform that reads CSV file records. */
- private static class TextFileReadTransform
- extends PTransform<PCollection<String>, PCollection<String>> {
- public TextFileReadTransform() {}
-
- @Override
- public PCollection<String> expand(PCollection<String> files) {
- return files
- // PCollection<String>
- .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW))
- // PCollection<Match.Metadata>
- .apply(FileIO.readMatches())
- // PCollection<FileIO.ReadableFile>
- .apply("Read lines", ParDo.of(new FileReadDoFn()));
- // PCollection<String>: line
- }
- }
-
- private Path createTestFile(String filename, Charset charset, String... lines)
- throws IOException {
- Path path = Files.createTempFile(filename, ".csv");
- try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) {
- for (String line : lines) {
- writer.write(line);
- writer.write('\n');
- }
- }
- return path;
- }
-}