You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2014/07/11 13:06:04 UTC
[jira] [Commented] (FLINK-834) Extend writeAsText with custom
formatting function.
[ https://issues.apache.org/jira/browse/FLINK-834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14058649#comment-14058649 ]
Chesnay Schepler commented on FLINK-834:
----------------------------------------
I've ran into a problem:
This is what the code looks like in the DataSet class:
{code:java}
public DataSink<String> writeAsFormattedText(String filePath, final TextFormatter<T> formatter) {
return new MapOperator<T, String>(this, new MapFunction<T, String>() {
@Override
public String map(T value) throws Exception {
return formatter.format(value);}})
.writeAsText(filePath);
}
{code}
when i try to use it like this:
{code:java}
final TextFormatter<Tuple2<String,String>> format = new TextFormatter<Tuple2<String,String>>() {
@Override
public String format(Tuple2<String, String> value) {
return value.f1 + " " + value.f0;}};
data.writeAsFormattedText("/tmp/output", format);
{code}
i get this exception that i can't quite make sense of:
{code:java}
Exception in thread "main" eu.stratosphere.api.common.NonSerializableUserCodeException: User-defined object eu.stratosphere.api.java.DataSet$2@6261d064 (eu.stratosphere.api.java.DataSet$2) contains non-serializable field this$0 = eu.stratosphere.api.java.operators.DataSource@1038160a
at eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper.<init>(UserCodeObjectWrapper.java:78)
at eu.stratosphere.api.common.operators.base.MapOperatorBase.<init>(MapOperatorBase.java:37)
at eu.stratosphere.api.java.operators.MapOperator.translateToDataFlow(MapOperator.java:51)
at eu.stratosphere.api.java.operators.MapOperator.translateToDataFlow(MapOperator.java:34)
at eu.stratosphere.api.java.operators.OperatorTranslation.translateSingleInputOperator(OperatorTranslation.java:119)
at eu.stratosphere.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:85)
at eu.stratosphere.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:61)
at eu.stratosphere.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
at eu.stratosphere.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:626)
at eu.stratosphere.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
at eu.stratosphere.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:516)
at NewClass.main(NewClass.java:60)
{code}
BUT, when i write this directly into the plan it works:
{code:java}
final TextFormatter<Tuple2<String,String>> format = new TextFormatter<Tuple2<String,String>>() {
@Override
public String format(Tuple2<String, String> value) {
return value.f1 + " " + value.f0;}};
data.map(new MapFunction<Tuple2<String, String>, String>() {
@Override
public String map(Tuple2<String, String> value) throws Exception {
return format.format(value);}})
.writeAsText("/tmp/output");
{code}
This has become a game of spot-the-difference, and i don't see it.
> Extend writeAsText with custom formatting function.
> ---------------------------------------------------
>
> Key: FLINK-834
> URL: https://issues.apache.org/jira/browse/FLINK-834
> Project: Flink
> Issue Type: Improvement
> Reporter: GitHub Import
> Assignee: Chesnay Schepler
> Labels: github-import, starter
> Fix For: pre-apache
>
>
> Currently, write as text uses the `toString()` method of data types to serialize the output as text. Alternatively, we have a CSV format that writes Tuple Datasets by using the `toString()` methods of the individual fields. Since Tuple's `toString()` method cannot be adapted without extending the class, it is not easily possible to define a custom output format of data sets which include Tuples.
> I think it would be good to have a way to explicitly format a text output.
> We could add a formatting function that returns a String for an input element, such as
> ```
> DataSet<Tuple2<String, MyPojo>> myDS;
> myDS.writeAsFormattedText("hdfs:///myOutPath",
> new TextFormatter<Tuple2<String, MyPojo>>() {
> @Override
> public String format(Tuple2<String, MyPojo> input) {
> return input.f0+" -> "+
> input.f1.getWhatEver()+" and "+
> input.f1.getSomethingElse();
> });
> ```
> Internally, we would use the default TextOutputFormat but with a previous Map for formatting.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/834
> Created by: [fhueske|https://github.com/fhueske]
> Labels: enhancement, java api, simple-issue, user satisfaction,
> Milestone: Release 0.6 (unplanned)
> Created at: Mon May 19 14:39:51 CEST 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.2#6252)