You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 15:15:47 UTC

[flink] 01/02: StringWriter support custom row delimiter

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eb4b3b75a40623dc83705277e469383e81acb675
Author: wangxiaowei <wa...@360.cn>
AuthorDate: Thu May 30 16:43:40 2019 +0800

    StringWriter support custom row delimiter
---
 .../streaming/connectors/fs/StringWriter.java      | 28 ++++++-
 .../connectors/fs/StreamWriterBaseComparator.java  |  3 +-
 .../streaming/connectors/fs/StringWriterTest.java  | 96 ++++++++++++++++++++++
 3 files changed, 124 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 84c961e..ddea224 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -41,12 +41,18 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 
 	private transient Charset charset;
 
+	private final String rowDelimiter;
+
+	private static final String DEFAULT_ROW_DELIMITER = "\n";
+
+	private  byte[] rowDelimiterBytes;
+
 	/**
 	 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
 	 * strings to bytes.
 	 */
 	public StringWriter() {
-		this("UTF-8");
+		this("UTF-8", DEFAULT_ROW_DELIMITER);
 	}
 
 	/**
@@ -56,12 +62,25 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 	 * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
 	 */
 	public StringWriter(String charsetName) {
+		this(charsetName, DEFAULT_ROW_DELIMITER);
+	}
+
+	/**
+	 * Creates a new {@code StringWriter} that uses the given charset and row delimiter to convert
+	 * strings to bytes.
+	 *
+	 * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+	 * @param rowDelimiter Parameter that specifies which character to use for delimiting rows
+	 */
+	public StringWriter(String charsetName, String rowDelimiter) {
 		this.charsetName = charsetName;
+		this.rowDelimiter = rowDelimiter;
 	}
 
 	protected StringWriter(StringWriter<T> other) {
 		super(other);
 		this.charsetName = other.charsetName;
+		this.rowDelimiter = other.rowDelimiter;
 	}
 
 	@Override
@@ -70,6 +89,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 
 		try {
 			this.charset = Charset.forName(charsetName);
+			this.rowDelimiterBytes = rowDelimiter.getBytes(charset);
 		}
 		catch (IllegalCharsetNameException e) {
 			throw new IOException("The charset " + charsetName + " is not valid.", e);
@@ -83,7 +103,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 	public void write(T element) throws IOException {
 		FSDataOutputStream outputStream = getStream();
 		outputStream.write(element.toString().getBytes(charset));
-		outputStream.write('\n');
+		outputStream.write(rowDelimiterBytes);
 	}
 
 	@Override
@@ -94,4 +114,8 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 	String getCharsetName() {
 		return charsetName;
 	}
+
+	public String getRowDelimiter() {
+		return rowDelimiter;
+	}
 }
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
index 9472c29..1e083ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
@@ -55,6 +55,7 @@ public class StreamWriterBaseComparator {
 		StringWriter<T> writer1,
 		StringWriter<T> writer2) {
 		return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
-			Objects.equals(writer1.getCharsetName(), writer2.getCharsetName());
+			Objects.equals(writer1.getCharsetName(), writer2.getCharsetName()) &&
+			Objects.equals(writer1.getRowDelimiter(), writer2.getRowDelimiter());
 	}
 }
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 7009d94..0574070 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -18,18 +18,57 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.flink.util.NetUtils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.StringTokenizer;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+
 /**
  * Tests for {@link StringWriter}.
  */
 public class StringWriterTest {
 
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+
+	private static String outputDir;
+
+	@Before
+	public void createHDFS() throws IOException {
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		outputDir = "hdfs://"
+			+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
+	}
+
 	@Test
 	public void testDuplicate() {
 		StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
@@ -41,5 +80,62 @@ public class StringWriterTest {
 		writer.setSyncOnFlush(false);
 		assertFalse(StreamWriterBaseComparator.equals(writer, other));
 		assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
+
+	}
+
+	@Test
+	public void testMultiRowdelimiters() throws IOException {
+		String rowDelimiter1 = "\n";
+		String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
+		Path testFile1 = new Path(outputDir + "/test01");
+		testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1);
+
+		String rowDelimiter2 = "\r\n";
+		String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
+		Path testFile2 = new Path(outputDir + "/test02");
+		testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2);
+
+		String rowDelimiter3 = "*";
+		String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
+		Path testFile3 = new Path(outputDir + "/test03");
+		testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3);
+
+		String rowDelimiter4 = "##";
+		String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
+		Path testFile4 = new Path(outputDir + "/test04");
+		testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4);
+
+	}
+
+	private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException {
+		StringWriter<String> writer = new StringWriter(charset, rowDelimiter);
+		writer.open(dfs, outputFile);
+		StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter);
+		while (lineTokenizer.hasMoreTokens()){
+			writer.write(lineTokenizer.nextToken());
+		}
+		writer.close();
+		FSDataInputStream inStream = dfs.open(outputFile);
+		byte[] buffer = new byte[inputData.getBytes(charset).length];
+		readFully(inStream, buffer);
+		inStream.close();
+		String outputData = new String(buffer, charset);
+		Assert.assertEquals(inputData, outputData);
+
+	}
+
+	private void readFully(InputStream in, byte[] buffer) throws IOException {
+		int pos = 0;
+		int remaining = buffer.length;
+
+		while (remaining > 0) {
+			int read = in.read(buffer, pos, remaining);
+			if (read == -1) {
+				throw new EOFException();
+			}
+
+			pos += read;
+			remaining -= read;
+		}
 	}
 }