You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2014/07/11 13:06:04 UTC

[jira] [Commented] (FLINK-834) Extend writeAsText with custom formatting function.

    [ https://issues.apache.org/jira/browse/FLINK-834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14058649#comment-14058649 ] 

Chesnay Schepler commented on FLINK-834:
----------------------------------------

I've ran into a problem:

This is what the code looks like in the DataSet class:
{code:java}
public DataSink<String> writeAsFormattedText(String filePath, final TextFormatter<T> formatter) {
		return new MapOperator<T, String>(this, new MapFunction<T, String>() {
			@Override
			public String map(T value) throws Exception {
				return formatter.format(value);}})
			.writeAsText(filePath);
	}
{code}
when i try to use it like this:
{code:java}
final TextFormatter<Tuple2<String,String>> format = new TextFormatter<Tuple2<String,String>>() {
			@Override
			public String format(Tuple2<String, String> value) {
				return value.f1 + " " + value.f0;}};
		
data.writeAsFormattedText("/tmp/output", format);
{code}
i get this exception that i can't quite make sense of:
{code:java}
Exception in thread "main" eu.stratosphere.api.common.NonSerializableUserCodeException: User-defined object eu.stratosphere.api.java.DataSet$2@6261d064 (eu.stratosphere.api.java.DataSet$2) contains non-serializable field this$0 = eu.stratosphere.api.java.operators.DataSource@1038160a
	at eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper.<init>(UserCodeObjectWrapper.java:78)
	at eu.stratosphere.api.common.operators.base.MapOperatorBase.<init>(MapOperatorBase.java:37)
	at eu.stratosphere.api.java.operators.MapOperator.translateToDataFlow(MapOperator.java:51)
	at eu.stratosphere.api.java.operators.MapOperator.translateToDataFlow(MapOperator.java:34)
	at eu.stratosphere.api.java.operators.OperatorTranslation.translateSingleInputOperator(OperatorTranslation.java:119)
	at eu.stratosphere.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:85)
	at eu.stratosphere.api.java.operators.OperatorTranslation.translate(OperatorTranslation.java:61)
	at eu.stratosphere.api.java.operators.OperatorTranslation.translateToPlan(OperatorTranslation.java:49)
	at eu.stratosphere.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:626)
	at eu.stratosphere.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
	at eu.stratosphere.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:516)
	at NewClass.main(NewClass.java:60)
{code}

BUT, when i write this directly into the plan it works:
{code:java}
final TextFormatter<Tuple2<String,String>> format = new TextFormatter<Tuple2<String,String>>() {
			@Override
			public String format(Tuple2<String, String> value) {
				return value.f1 + " " + value.f0;}};
		
data.map(new MapFunction<Tuple2<String, String>, String>() {
	@Override
	public String map(Tuple2<String, String> value) throws Exception {
		return format.format(value);}})
	.writeAsText("/tmp/output");
{code}

This has become a game of spot-the-difference, and i don't see it.

> Extend writeAsText with custom formatting function.
> ---------------------------------------------------
>
>                 Key: FLINK-834
>                 URL: https://issues.apache.org/jira/browse/FLINK-834
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: GitHub Import
>            Assignee: Chesnay Schepler
>              Labels: github-import, starter
>             Fix For: pre-apache
>
>
> Currently, write as text uses the `toString()` method of data types to serialize the output as text. Alternatively, we have a CSV format that writes Tuple Datasets by using the `toString()` methods of the individual fields. Since Tuple's `toString()` method cannot be adapted without extending the class, it is not easily possible to define a custom output format of data sets which include Tuples.
> I think it would be good to have a way to explicitly format a text output. 
> We could add a formatting function that returns a String for an input element, such as
> ```
> DataSet<Tuple2<String, MyPojo>> myDS;
> myDS.writeAsFormattedText("hdfs:///myOutPath", 
>   new TextFormatter<Tuple2<String, MyPojo>>() {
>     @Override
>     public String format(Tuple2<String, MyPojo> input) {
>       return input.f0+" -> "+
>              input.f1.getWhatEver()+"  and "+
>              input.f1.getSomethingElse();
>     });
> ```
> Internally, we would use the default TextOutputFormat but with a previous Map for formatting.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/834
> Created by: [fhueske|https://github.com/fhueske]
> Labels: enhancement, java api, simple-issue, user satisfaction, 
> Milestone: Release 0.6 (unplanned)
> Created at: Mon May 19 14:39:51 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.2#6252)