You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/08/22 13:23:01 UTC

git commit: [FLINK-834] Added DataSet.writeAsFormattedText()

Repository: incubator-flink
Updated Branches:
  refs/heads/master 46f5c89c6 -> 6eefedc69


[FLINK-834] Added DataSet.writeAsFormattedText()


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6eefedc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6eefedc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6eefedc6

Branch: refs/heads/master
Commit: 6eefedc69ec2966d69821034d0963abb12b9e195
Parents: 46f5c89
Author: zentol <s....@web.de>
Authored: Mon Jul 28 12:27:17 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Aug 22 12:55:51 2014 +0200

----------------------------------------------------------------------
 docs/java_api_guide.md                          | 11 ++++--
 .../java/org/apache/flink/api/java/DataSet.java | 31 ++++++++++++++++
 .../api/java/functions/FormattingMapper.java    | 38 ++++++++++++++++++++
 .../flink/api/java/io/TextOutputFormat.java     |  5 +++
 4 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/docs/java_api_guide.md
----------------------------------------------------------------------
diff --git a/docs/java_api_guide.md b/docs/java_api_guide.md
index f180895..401cbe8 100644
--- a/docs/java_api_guide.md
+++ b/docs/java_api_guide.md
@@ -802,7 +802,8 @@ Data Sinks
 Data sinks consume DataSets and are used to store or return them. Data sink operations are described using an {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormat.java "OutputFormat" %}. Flink comes with a variety of built-in output formats that
 are encapsulated behind operations on the DataSet type:
 
-- `writeAsText()` / `TextOuputFormat` - Writes for each element as a String in a line. The String are obtained by calling the *toString()* method.
+- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are obtained by calling the *toString()* method of each element.
+- `writeAsFormattedText()` / `TextOutputFormat` - Write elements line-wise as Strings. The Strings are obtained by calling a user-defined *format()* method for each element.
 - `writeAsCsv` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the *toString()* method of the objects.
 - `print()` / `printToErr()` - Prints the *toString()* value of each element on the standard out / strandard error stream.
 - `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
@@ -832,7 +833,13 @@ DataSet<Tuple3<String, Integer, Double>> values = // [...]
 values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
 
 // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
-value.writeAsText("file:///path/to/the/result/file");
+values.writeAsText("file:///path/to/the/result/file");
+
+// this wites values as strings using a user-defined TextFormatter object
+values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() {
+    public String format (Tuple2<Integer, Integer> value) {
+        return value.f1 + " - " + value.f0;
+    }});
 ```
 
 Using a custom output format:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d8450b5..d25e64b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -29,11 +29,13 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FormattingMapper;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
@@ -822,6 +824,35 @@ public abstract class DataSet<T> {
 		return output(tof);
 	}
 	
+/**
+	 * Writes a DataSet as a text file to the specified location.<br/>
+	 * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+	 *
+	 * @param filePath The path pointing to the location the text file is written to.
+	 * @param formatter formatter that is applied on every element of the DataSet.
+	 * @return The DataSink that writes the DataSet.
+	 *
+	 * @see TextOutputFormat
+	 */
+	public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
+		return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+	}
+
+	/**
+	 * Writes a DataSet as a text file to the specified location.<br/>
+	 * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written.
+	 *
+	 * @param filePath The path pointing to the location the text file is written to.
+	 * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
+	 * @param formatter formatter that is applied on every element of the DataSet.
+	 * @return The DataSink that writes the DataSet.
+	 *
+	 * @see TextOutputFormat
+	 */
+	public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
+		return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath, writeMode);
+	}
+	
 	/**
 	 * Writes a {@link Tuple} DataSet as a CSV file to the specified location.<br/>
 	 * <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br/>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
new file mode 100644
index 0000000..cc78773
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
+
+public class FormattingMapper<T> implements MapFunction<T, String> {
+	private static final long serialVersionUID = 1L;
+
+	private final TextFormatter<T> formatter;
+
+	public FormattingMapper(TextOutputFormat.TextFormatter<T> formatter) {
+		this.formatter = formatter;
+	}
+
+	@Override
+	public String map(T value) throws Exception {
+		return formatter.format(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6eefedc6/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index 08d6318..741add7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.io;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
@@ -39,6 +40,10 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
 
 	// --------------------------------------------------------------------------------------------
 
+	public static interface TextFormatter<IN> extends Serializable {
+		public String format(IN value);
+	}
+
 	public TextOutputFormat(Path outputPath) {
 		this(outputPath, "UTF-8");
 	}