You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 15:15:48 UTC

[flink] 02/02: fixup! StringWriter support custom row delimiter

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e30f3ad63c614db14bf94e988c40b02532e7f8e8
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed Aug 7 16:41:47 2019 +0200

    fixup! StringWriter support custom row delimiter
---
 .../streaming/connectors/fs/StringWriter.java      |   2 +-
 .../streaming/connectors/fs/StringWriterTest.java  | 100 ++++++++++-----------
 2 files changed, 46 insertions(+), 56 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index ddea224..5f5c9b8 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -45,7 +45,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 
 	private static final String DEFAULT_ROW_DELIMITER = "\n";
 
-	private  byte[] rowDelimiterBytes;
+	private byte[] rowDelimiterBytes;
 
 	/**
 	 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 0574070..6122a2a 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -18,46 +18,47 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.Assert;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.StringTokenizer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
 /**
  * Tests for {@link StringWriter}.
  */
 public class StringWriterTest {
 
+	private static final String CHARSET_NAME = StandardCharsets.UTF_8.name();
+
 	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
 	private static MiniDFSCluster hdfsCluster;
 	private static org.apache.hadoop.fs.FileSystem dfs;
 
 	private static String outputDir;
 
-	@Before
-	public void createHDFS() throws IOException {
+	@BeforeClass
+	public static void createHDFS() throws IOException {
 		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
 
-		File dataDir = tempFolder.newFolder();
+		File dataDir = TEMPORARY_FOLDER.newFolder();
 
 		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
 		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
@@ -69,6 +70,14 @@ public class StringWriterTest {
 			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
 	}
 
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+			hdfsCluster = null;
+		}
+	}
+
 	@Test
 	public void testDuplicate() {
 		StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
@@ -80,62 +89,43 @@ public class StringWriterTest {
 		writer.setSyncOnFlush(false);
 		assertFalse(StreamWriterBaseComparator.equals(writer, other));
 		assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
-
 	}
 
 	@Test
 	public void testMultiRowdelimiters() throws IOException {
-		String rowDelimiter1 = "\n";
-		String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
-		Path testFile1 = new Path(outputDir + "/test01");
-		testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1);
-
-		String rowDelimiter2 = "\r\n";
-		String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
-		Path testFile2 = new Path(outputDir + "/test02");
-		testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2);
-
-		String rowDelimiter3 = "*";
-		String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
-		Path testFile3 = new Path(outputDir + "/test03");
-		testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3);
-
-		String rowDelimiter4 = "##";
-		String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
-		Path testFile4 = new Path(outputDir + "/test04");
-		testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4);
-
+		testRowDelimiter("\n");
+		testRowDelimiter("\r\n");
+		testRowDelimiter("*");
+		testRowDelimiter("##");
 	}
 
-	private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException {
-		StringWriter<String> writer = new StringWriter(charset, rowDelimiter);
-		writer.open(dfs, outputFile);
-		StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter);
-		while (lineTokenizer.hasMoreTokens()){
-			writer.write(lineTokenizer.nextToken());
-		}
-		writer.close();
-		FSDataInputStream inStream = dfs.open(outputFile);
-		byte[] buffer = new byte[inputData.getBytes(charset).length];
-		readFully(inStream, buffer);
-		inStream.close();
-		String outputData = new String(buffer, charset);
-		Assert.assertEquals(inputData, outputData);
+	private void testRowDelimiter(String rowDelimiter) throws IOException {
+		String[] inputData = new String[] {"A", "B", "C", "D", "E"};
 
-	}
+		Path outputPath = new Path(TEMPORARY_FOLDER.newFile().getAbsolutePath());
 
-	private void readFully(InputStream in, byte[] buffer) throws IOException {
-		int pos = 0;
-		int remaining = buffer.length;
-
-		while (remaining > 0) {
-			int read = in.read(buffer, pos, remaining);
-			if (read == -1) {
-				throw new EOFException();
+		StringWriter<String> writer = new StringWriter(CHARSET_NAME, rowDelimiter);
+		try {
+			writer.open(dfs, outputPath);
+			for (String input: inputData) {
+				writer.write(input);
 			}
+		}
+		finally {
+			writer.close();
+		}
 
-			pos += read;
-			remaining -= read;
+		try (FSDataInputStream inStream = dfs.open(outputPath)) {
+			String expectedOutput = String.join(rowDelimiter, inputData);
+			byte[] buffer = new byte[expectedOutput.getBytes(CHARSET_NAME).length];
+			readFully(inStream, buffer);
+
+			String outputData = new String(buffer, CHARSET_NAME);
+			assertEquals(expectedOutput, outputData);
 		}
 	}
+
+	private void readFully(InputStream in, byte[] buffer) throws IOException {
+		IOUtils.readFully(in, buffer, 0, buffer.length);
+	}
 }