You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 15:15:48 UTC
[flink] 02/02: fixup! StringWriter support custom row delimiter
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e30f3ad63c614db14bf94e988c40b02532e7f8e8
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Aug 7 16:41:47 2019 +0200
fixup! StringWriter support custom row delimiter
---
.../streaming/connectors/fs/StringWriter.java | 2 +-
.../streaming/connectors/fs/StringWriterTest.java | 100 ++++++++++-----------
2 files changed, 46 insertions(+), 56 deletions(-)
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index ddea224..5f5c9b8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -45,7 +45,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
private static final String DEFAULT_ROW_DELIMITER = "\n";
- private byte[] rowDelimiterBytes;
+ private byte[] rowDelimiterBytes;
/**
* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 0574070..6122a2a 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -18,46 +18,47 @@
package org.apache.flink.streaming.connectors.fs;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.Assert;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import java.util.StringTokenizer;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
/**
* Tests for {@link StringWriter}.
*/
public class StringWriterTest {
+ private static final String CHARSET_NAME = StandardCharsets.UTF_8.name();
+
@ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;
private static String outputDir;
- @Before
- public void createHDFS() throws IOException {
+ @BeforeClass
+ public static void createHDFS() throws IOException {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- File dataDir = tempFolder.newFolder();
+ File dataDir = TEMPORARY_FOLDER.newFolder();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
@@ -69,6 +70,14 @@ public class StringWriterTest {
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
}
+ @AfterClass
+ public static void destroyHDFS() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ hdfsCluster = null;
+ }
+ }
+
@Test
public void testDuplicate() {
StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
@@ -80,62 +89,43 @@ public class StringWriterTest {
writer.setSyncOnFlush(false);
assertFalse(StreamWriterBaseComparator.equals(writer, other));
assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
-
}
@Test
public void testMultiRowdelimiters() throws IOException {
- String rowDelimiter1 = "\n";
- String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
- Path testFile1 = new Path(outputDir + "/test01");
- testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1);
-
- String rowDelimiter2 = "\r\n";
- String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
- Path testFile2 = new Path(outputDir + "/test02");
- testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2);
-
- String rowDelimiter3 = "*";
- String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
- Path testFile3 = new Path(outputDir + "/test03");
- testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3);
-
- String rowDelimiter4 = "##";
- String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
- Path testFile4 = new Path(outputDir + "/test04");
- testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4);
-
+ testRowDelimiter("\n");
+ testRowDelimiter("\r\n");
+ testRowDelimiter("*");
+ testRowDelimiter("##");
}
- private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException {
- StringWriter<String> writer = new StringWriter(charset, rowDelimiter);
- writer.open(dfs, outputFile);
- StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter);
- while (lineTokenizer.hasMoreTokens()){
- writer.write(lineTokenizer.nextToken());
- }
- writer.close();
- FSDataInputStream inStream = dfs.open(outputFile);
- byte[] buffer = new byte[inputData.getBytes(charset).length];
- readFully(inStream, buffer);
- inStream.close();
- String outputData = new String(buffer, charset);
- Assert.assertEquals(inputData, outputData);
+ private void testRowDelimiter(String rowDelimiter) throws IOException {
+ String[] inputData = new String[] {"A", "B", "C", "D", "E"};
- }
+ Path outputPath = new Path(TEMPORARY_FOLDER.newFile().getAbsolutePath());
- private void readFully(InputStream in, byte[] buffer) throws IOException {
- int pos = 0;
- int remaining = buffer.length;
-
- while (remaining > 0) {
- int read = in.read(buffer, pos, remaining);
- if (read == -1) {
- throw new EOFException();
+ StringWriter<String> writer = new StringWriter(CHARSET_NAME, rowDelimiter);
+ try {
+ writer.open(dfs, outputPath);
+ for (String input: inputData) {
+ writer.write(input);
}
+ }
+ finally {
+ writer.close();
+ }
- pos += read;
- remaining -= read;
+ try (FSDataInputStream inStream = dfs.open(outputPath)) {
+ String expectedOutput = String.join(rowDelimiter, inputData);
+ byte[] buffer = new byte[expectedOutput.getBytes(CHARSET_NAME).length];
+ readFully(inStream, buffer);
+
+ String outputData = new String(buffer, CHARSET_NAME);
+ assertEquals(expectedOutput, outputData);
}
}
+
+ private void readFully(InputStream in, byte[] buffer) throws IOException {
+ IOUtils.readFully(in, buffer, 0, buffer.length);
+ }
}