You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/08 15:15:47 UTC
[flink] 01/02: StringWriter support custom row delimiter
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit eb4b3b75a40623dc83705277e469383e81acb675
Author: wangxiaowei <wa...@360.cn>
AuthorDate: Thu May 30 16:43:40 2019 +0800
StringWriter support custom row delimiter
---
.../streaming/connectors/fs/StringWriter.java | 28 ++++++-
.../connectors/fs/StreamWriterBaseComparator.java | 3 +-
.../streaming/connectors/fs/StringWriterTest.java | 96 ++++++++++++++++++++++
3 files changed, 124 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 84c961e..ddea224 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -41,12 +41,18 @@ public class StringWriter<T> extends StreamWriterBase<T> {
private transient Charset charset;
+ private final String rowDelimiter;
+
+ private static final String DEFAULT_ROW_DELIMITER = "\n";
+
+ private byte[] rowDelimiterBytes;
+
/**
* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
* strings to bytes.
*/
public StringWriter() {
- this("UTF-8");
+ this("UTF-8", DEFAULT_ROW_DELIMITER);
}
/**
@@ -56,12 +62,25 @@ public class StringWriter<T> extends StreamWriterBase<T> {
* @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
*/
public StringWriter(String charsetName) {
+ this(charsetName, DEFAULT_ROW_DELIMITER);
+ }
+
+ /**
+ * Creates a new {@code StringWriter} that uses the given charset and row delimiter to convert
+ * strings to bytes.
+ *
+ * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+ * @param rowDelimiter Parameter that specifies which character to use for delimiting rows
+ */
+ public StringWriter(String charsetName, String rowDelimiter) {
this.charsetName = charsetName;
+ this.rowDelimiter = rowDelimiter;
}
protected StringWriter(StringWriter<T> other) {
super(other);
this.charsetName = other.charsetName;
+ this.rowDelimiter = other.rowDelimiter;
}
@Override
@@ -70,6 +89,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
try {
this.charset = Charset.forName(charsetName);
+ this.rowDelimiterBytes = rowDelimiter.getBytes(charset);
}
catch (IllegalCharsetNameException e) {
throw new IOException("The charset " + charsetName + " is not valid.", e);
@@ -83,7 +103,7 @@ public class StringWriter<T> extends StreamWriterBase<T> {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
- outputStream.write('\n');
+ outputStream.write(rowDelimiterBytes);
}
@Override
@@ -94,4 +114,8 @@ public class StringWriter<T> extends StreamWriterBase<T> {
String getCharsetName() {
return charsetName;
}
+
+ public String getRowDelimiter() {
+ return rowDelimiter;
+ }
}
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
index 9472c29..1e083ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
@@ -55,6 +55,7 @@ public class StreamWriterBaseComparator {
StringWriter<T> writer1,
StringWriter<T> writer2) {
return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
- Objects.equals(writer1.getCharsetName(), writer2.getCharsetName());
+ Objects.equals(writer1.getCharsetName(), writer2.getCharsetName()) &&
+ Objects.equals(writer1.getRowDelimiter(), writer2.getRowDelimiter());
}
}
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 7009d94..0574070 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -18,18 +18,57 @@
package org.apache.flink.streaming.connectors.fs;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.StringTokenizer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
/**
* Tests for {@link StringWriter}.
*/
public class StringWriterTest {
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+ private static org.apache.hadoop.fs.FileSystem dfs;
+
+ private static String outputDir;
+
+ @Before
+ public void createHDFS() throws IOException {
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+
+ File dataDir = tempFolder.newFolder();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+
+ outputDir = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
+ }
+
@Test
public void testDuplicate() {
StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
@@ -41,5 +80,62 @@ public class StringWriterTest {
writer.setSyncOnFlush(false);
assertFalse(StreamWriterBaseComparator.equals(writer, other));
assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
+
+ }
+
+ @Test
+ public void testMultiRowdelimiters() throws IOException {
+ String rowDelimiter1 = "\n";
+ String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
+ Path testFile1 = new Path(outputDir + "/test01");
+ testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1);
+
+ String rowDelimiter2 = "\r\n";
+ String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
+ Path testFile2 = new Path(outputDir + "/test02");
+ testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2);
+
+ String rowDelimiter3 = "*";
+ String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
+ Path testFile3 = new Path(outputDir + "/test03");
+ testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3);
+
+ String rowDelimiter4 = "##";
+ String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
+ Path testFile4 = new Path(outputDir + "/test04");
+ testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4);
+
+ }
+
+ private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException {
+ StringWriter<String> writer = new StringWriter(charset, rowDelimiter);
+ writer.open(dfs, outputFile);
+ StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter);
+ while (lineTokenizer.hasMoreTokens()){
+ writer.write(lineTokenizer.nextToken());
+ }
+ writer.close();
+ FSDataInputStream inStream = dfs.open(outputFile);
+ byte[] buffer = new byte[inputData.getBytes(charset).length];
+ readFully(inStream, buffer);
+ inStream.close();
+ String outputData = new String(buffer, charset);
+ Assert.assertEquals(inputData, outputData);
+
+ }
+
+ private void readFully(InputStream in, byte[] buffer) throws IOException {
+ int pos = 0;
+ int remaining = buffer.length;
+
+ while (remaining > 0) {
+ int read = in.read(buffer, pos, remaining);
+ if (read == -1) {
+ throw new EOFException();
+ }
+
+ pos += read;
+ remaining -= read;
+ }
}
}