You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/08/22 13:23:01 UTC
git commit: [FLINK-834] Added DataSet.writeAsFormattedText()
Repository: incubator-flink
Updated Branches:
refs/heads/master 46f5c89c6 -> 6eefedc69
[FLINK-834] Added DataSet.writeAsFormattedText()
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6eefedc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6eefedc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6eefedc6
Branch: refs/heads/master
Commit: 6eefedc69ec2966d69821034d0963abb12b9e195
Parents: 46f5c89
Author: zentol <s....@web.de>
Authored: Mon Jul 28 12:27:17 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Aug 22 12:55:51 2014 +0200
----------------------------------------------------------------------
docs/java_api_guide.md | 11 ++++--
.../java/org/apache/flink/api/java/DataSet.java | 31 ++++++++++++++++
.../api/java/functions/FormattingMapper.java | 38 ++++++++++++++++++++
.../flink/api/java/io/TextOutputFormat.java | 5 +++
4 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index f180895..401cbe8 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -802,7 +802,8 @@ Data Sinks
Data sinks consume DataSets and are used to store or return them. Data sink operations are described using an {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java "OutputFormat" %}. Flink comes with a variety of built-in output formats that
are encapsulated behind operations on the DataSet type:
-- `writeAsText()` / `TextOuputFormat` - Writes for each element as a String in a line. The String are obtained by calling the *toString()* method.
+- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are obtained by calling the *toString()* method of each element.
+- `writeAsFormattedText()` / `TextOutputFormat` - Write elements line-wise as Strings. The Strings are obtained by calling a user-defined *format()* method for each element.
- `writeAsCsv` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
- `print()` / `printToErr()` - Prints the *toString()* value of each element on the standard out / strandard error stream.
- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
@@ -832,7 +833,13 @@ DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
-value.writeAsText("file:///path/to/the/result/file");
+values.writeAsText("file:///path/to/the/result/file");
+
+// this wites values as strings using a user-defined TextFormatter object
+values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() {
+ public String format (Tuple2<Integer, Integer> value) {
+ return value.f1 + " - " + value.f0;
+ }});
```
Using a custom output format:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d8450b5..d25e64b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -29,11 +29,13 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FormattingMapper;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
@@ -822,6 +824,35 @@ public abstract class DataSet<T> {
return output(tof);
}
+/**
+ * Writes a DataSet as a text file to the specified location.<br/>
+ * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+ *
+ * @param filePath The path pointing to the location the text file is written to.
+ * @param formatter formatter that is applied on every element of the DataSet.
+ * @return The DataSink that writes the DataSet.
+ *
+ * @see TextOutputFormat
+ */
+ public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
+ return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+ }
+
+ /**
+ * Writes a DataSet as a text file to the specified location.<br/>
+ * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+ *
+ * @param filePath The path pointing to the location the text file is written to.
+ * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
+ * @param formatter formatter that is applied on every element of the DataSet.
+ * @return The DataSink that writes the DataSet.
+ *
+ * @see TextOutputFormat
+ */
+ public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
+ return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath, writeMode);
+ }
+
/**
* Writes a {@link Tuple} DataSet as a CSV file to the specified location.<br/>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br/>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
new file mode 100644
index 0000000..cc78773
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
+
+public class FormattingMapper<T> implements MapFunction<T, String> {
+ private static final long serialVersionUID = 1L;
+
+ private final TextFormatter<T> formatter;
+
+ public FormattingMapper(TextOutputFormat.TextFormatter<T> formatter) {
+ this.formatter = formatter;
+ }
+
+ @Override
+ public String map(T value) throws Exception {
+ return formatter.format(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index 08d6318..741add7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.io;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
@@ -39,6 +40,10 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
// --------------------------------------------------------------------------------------------
+ public static interface TextFormatter<IN> extends Serializable {
+ public String format(IN value);
+ }
+
public TextOutputFormat(Path outputPath) {
this(outputPath, "UTF-8");
}