You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Anirvan BASU <an...@inria.fr> on 2014/11/25 16:55:17 UTC

Program crashes trying to read JSON file

Hello all, 

We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink (3-tuple based) dataset, then performing some operations. 

We encountered the following runtime error: 

[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) 
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) 
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) 
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) 
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) 
... 6 more 
[/QUOTE] 



The code snippet that could have caused this error (i.e. that we edited) is the following 

[CODE] 

import org.apache.flink.api.java.tuple.Tuple3; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple3<Integer, String, String>> counts = 
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event) 
text.flatMap(new SelectDataFlatMap() ) 
// group by the tuple field "0" and sum up tuple field "1" 
.groupBy(2) 
.sum(2); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple3<Integer, String, String>> { 

@Override 
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out) 
throws Exception { 
try { 
out.collect(new Tuple3<Integer, String, String>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"))); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 



[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) 
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) 
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) 
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) 
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) 
... 6 more 
[/QUOTE] 


The JSON file is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} 
[/JSON] 



Thanks in advance for helping us to understand where we are going wrong. 

Anirvan 

Re: Program crashes trying to read JSON file

Posted by Felix Neutatz <ne...@googlemail.com>.
Hi Anirvan,
I don't know whether this fits your goals, but you can try a Hadoop Input
Format like
https://github.com/alexholmes/json-mapreduce#an-inputformat-to-work-with-splittable-multi-line-json

Best regards,
Felix
Am 26.11.2014 09:01 schrieb "Anirvan BASU" <an...@inria.fr>:

> Ciao Stefano !
>
> Thanks for this early morning information, very helpful.
> Yes, for outputting the data we are using WriteAsCSV which is stable over
> different versions of Flink.
>
> Our current concern is "reading" a JSON file into a dataset.
> As you can see, we have a simple 2-level JSON hierarchy that can be easily
> mapped to a fixed-column CSV.
> But the place we are stuck at currently is in reading the file correctly
> into a tuple-based dataset in memory.
> Once this is achieved, the rest will be fairly simple dataset
> transformations.
>
> As you can see from the pasted code, we used functions developed from the
> stream connector for our purposes. (Thanks to Gyula and Marton for that
> information)
>
> If reading a JSON file using functions already developed is not possible
> then we will have to develop some custom functions on hardcore string
> operations to do the same.
> That would be like reinventing the wheel ... :-((
>
> Any advice in this regard will be highly appreciated.
>
> Thanks in advance to all,
> Anirvan
>
> ----- Original Message -----
>
> > From: "Stefano Bortoli" <s....@gmail.com>
> > To: "user" <us...@flink.incubator.apache.org>
> > Sent: Wednesday, November 26, 2014 8:37:59 AM
> > Subject: Re: Program crashes trying to read JSON file
>
> > You can output your results in different ways. If all you need is to
> write a
> > file, I normally use the writeAsText method (however, there is the
> > writeAsCSV, writeAsFormattedText. Of write according to your custom
> > FileOutputFormat.
>
> > datasetToPrint.writeAsText("/path/to/file/with/permission",
> > WriteMode.OVERWRITE);
>
> > Keep in mind that this will output your tuple dataset. Therefore, if you
> want
> > to shape your output differently, It may be necessary to have further
> > processing.
>
> > saluti,
> > Stefano
>
> > 2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :
>
> > > Thanks to Aljoscha and Stefano for pointing out the flaw.
> >
>
> > > We corrected the issue as follows:
> >
>
> > > [CODE]
> >
>
> > > import org.apache.flink.api.java.tuple. Tuple4 ;
> >
> > > import org.apache.flink.util.Collector;
> >
> > > import org.apache.flink.api.java.DataSet;
> >
> > > import org.apache.flink.api.java.ExecutionEnvironment;
> >
> > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> >
> > > import org.apache.sling.commons.json.JSONException;
> >
> > > ...
> >
>
> > > public static void main(String[] args) throws Exception {
> >
>
> > > if(!parseParameters(args)) {
> >
> > > return;
> >
> > > }
> >
>
> > > // set up the execution environment
> >
> > > final ExecutionEnvironment env =
> > > ExecutionEnvironment.getExecutionEnvironment();
> >
>
> > > // get input data
> >
> > > DataSet<String> text = getTextDataSet(env);
> >
>
> > > DataSet<Tuple4<Integer, String, String, Integer >> counts =
> >
> > > // split up the lines in pairs (4-tuples) containing:
> > > (timestamp,uuid,event,
> > > count )
> >
> > > text.flatMap(new SelectDataFlatMap())
> >
> > > // group by the tuple field "1" (an event - string) and sum up tuple
> field
> > > "3" (integer - value 1)
> >
> > > . groupBy(1)
> >
> > > . sum(3 );
> >
>
> > > // emit result
> >
> > > if(fileOutput) {
> >
> > > counts.writeAsCsv(outputPath, "\n", " ");
> >
> > > } else {
> >
> > > counts.print();
> >
> > > }
> >
>
> > > // execute program
> >
> > > env.execute("Weblogs Programme");
> >
> > > }
> >
>
> > > ...
> >
>
> > > public static class SelectDataFlatMap extends
> >
> > > JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> {
> >
>
> > > private static final long serialVersionUID = 1L;
> >
>
> > > @Override
> >
> > > public void flatMap(String value, Collector<Tuple4<Integer, String,
> String,
> > > Integer>> record)
> >
> > > throws Exception {
> >
> > > try {
> >
> > > record.collect(new Tuple4<Integer, String, String, Integer>(
> >
> > > getInt(value, "timestamp"),
> >
> > > getString(value, "uuid"),
> >
> > > getString(value, "event"),
> >
> > > 1));
> >
> > > } catch (JSONException e) {
> >
> > > System.err.println("Field not found");
> >
> > > }
> >
> > > }
> >
> > > }
> >
>
> > > [/CODE]
> >
>
> > > However, this time the issue was different.
> >
> > > The programme executed correctly till status FINISHED.
> >
> > > However, there was no output :-((
> >
> > > i.e. For each Task Manager, an empty file is written.
> >
>
> > > When we checked further about the input text file that is read using
> > > env.readTextFile() we find that instead of a text string (full text
> > > dataset)
> > > only a small string is written!
> >
> > > Something as :
> >
> > > org.apache.flink.api.java.operators.DataSource@6bd8b476
> >
>
> > > Worse still ! this string value sometimes remains the same over
> multiple
> > > runs
> > > of the programme ....
> >
> > > Is this natural ? Is this just the handle to the file or the dataset ?
> >
> > > Is the Collector() working correctly also ?
> >
>
> > > Note :
> >
> > > The actual JSON file (i.e. the text file that should be read) is of the
> > > following nature, with a 2-level hierarchy for one field:
> >
> > > [JSON]
> >
> > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> platform:
> > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> can-usage-v1
> > > event: General,Login,Success}}
> >
> > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> platform:
> > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> can-usage-v1
> > > event: General,App,Opened}}
> >
> > > [/JSON]
> >
>
> > > So now again, we are confused if we are doing it correctly :-((
> >
>
> > > Thanks in advance for helping us to understand where we are going
> wrong.
> >
> > > Anirvan
> >
>
> > > > From: "Stefano Bortoli" < s.bortoli@gmail.com >
> > >
> >
> > > > To: "user" < user@flink.incubator.apache.org >
> > >
> >
> > > > Cc: dev@flink.incubator.apache.org
> > >
> >
> > > > Sent: Tuesday, November 25, 2014 5:05:34 PM
> > >
> >
> > > > Subject: Re: Program crashes trying to read JSON file
> > >
> >
>
> > > > Very quickly, it seems you are trying to sum on Strings
> > >
> >
>
> > > > Caused by: org.apache.flink.api.java.
> > >
> >
> > > > aggregation.UnsupportedAggregationTypeException: The type
> > > > java.lang.String
> > > > has currently not supported for built-in sum aggregations.
> > >
> >
>
> > > > Check your tuple types and be sure that you are not summing on
> strings.
> > >
> >
>
> > > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :
> > >
> >
>
> > > > > Hello all,
> > > >
> > >
> >
>
> > > > > We are using Flink 0.7 and trying to read a large JSON file,
> reading
> > > > > some
> > > > > fields into a flink (3-tuple based) dataset, then performing some
> > > > > operations.
> > > >
> > >
> >
>
> > > > > We encountered the following runtime error:
> > > >
> > >
> >
>
> > > > > [QUOTE]
> > > >
> > >
> >
> > > > > Error: The main method caused an error.
> > > >
> > >
> >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method
> > > > > caused an error.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > >
> > >
> >
> > > > > Caused by:
> > > > >
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > > The type java.lang.String has currently not supported for built-in
> sum
> > > > > aggregations.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > >
> > >
> >
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >
> > >
> >
> > > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > >
> > >
> >
> > > > > ... 6 more
> > > >
> > >
> >
> > > > > [/QUOTE]
> > > >
> > >
> >
>
> > > > > The code snippet that could have caused this error (i.e. that we
> > > > > edited)
> > > > > is
> > > > > the following
> > > >
> > >
> >
>
> > > > > [CODE]
> > > >
> > >
> >
>
> > > > > import org.apache.flink.api.java.tuple.Tuple3;
> > > >
> > >
> >
> > > > > import org.apache.flink.util.Collector;
> > > >
> > >
> >
> > > > > import org.apache.flink.api.java.DataSet;
> > > >
> > >
> >
> > > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > >
> > >
> >
> > > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> > > >
> > >
> >
> > > > > import org.apache.sling.commons.json.JSONException;
> > > >
> > >
> >
> > > > > ...
> > > >
> > >
> >
>
> > > > > public static void main(String[] args) throws Exception {
> > > >
> > >
> >
>
> > > > > if(!parseParameters(args)) {
> > > >
> > >
> >
> > > > > return;
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > // set up the execution environment
> > > >
> > >
> >
> > > > > final ExecutionEnvironment env =
> > > > > ExecutionEnvironment.getExecutionEnvironment();
> > > >
> > >
> >
>
> > > > > // get input data
> > > >
> > >
> >
> > > > > DataSet<String> text = getTextDataSet(env);
> > > >
> > >
> >
>
> > > > > DataSet<Tuple3<Integer, String, String>> counts =
> > > >
> > >
> >
> > > > > // split up the lines in pairs (3-tuples) containing:
> > > > > (timestamp,uuid,event)
> > > >
> > >
> >
> > > > > text.flatMap(new SelectDataFlatMap() )
> > > >
> > >
> >
> > > > > // group by the tuple field "0" and sum up tuple field "1"
> > > >
> > >
> >
> > > > > .groupBy(2)
> > > >
> > >
> >
> > > > > .sum(2);
> > > >
> > >
> >
>
> > > > > // emit result
> > > >
> > >
> >
> > > > > if(fileOutput) {
> > > >
> > >
> >
> > > > > counts.writeAsCsv(outputPath, "\n", " ");
> > > >
> > >
> >
> > > > > } else {
> > > >
> > >
> >
> > > > > counts.print();
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > // execute program
> > > >
> > >
> >
> > > > > env.execute("Weblogs Programme");
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > ...
> > > >
> > >
> >
>
> > > > > public static class SelectDataFlatMap extends
> > > >
> > >
> >
> > > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> > > >
> > >
> >
>
> > > > > @Override
> > > >
> > >
> >
> > > > > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > > > > String>>
> > > > > out)
> > > >
> > >
> >
> > > > > throws Exception {
> > > >
> > >
> >
> > > > > try {
> > > >
> > >
> >
> > > > > out.collect(new Tuple3<Integer, String, String>(
> > > >
> > >
> >
> > > > > getInt(value, "timestamp"),
> > > >
> > >
> >
> > > > > getString(value, "uuid"),
> > > >
> > >
> >
> > > > > getString(value, "event")));
> > > >
> > >
> >
> > > > > } catch (JSONException e) {
> > > >
> > >
> >
> > > > > System.err.println("Field not found");
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
> > > > > }
> > > >
> > >
> >
>
> > > > > [/CODE]
> > > >
> > >
> >
>
> > > > > [QUOTE]
> > > >
> > >
> >
> > > > > Error: The main method caused an error.
> > > >
> > >
> >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > > method
> > > > > caused an error.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > >
> > >
> >
> > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > >
> > >
> >
> > > > > Caused by:
> > > > >
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > > The type java.lang.String has currently not supported for built-in
> sum
> > > > > aggregations.
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > >
> > >
> >
> > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >
> > >
> >
> > > > > at
> > > > >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >
> > >
> >
> > > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > >
> > >
> >
> > > > > at
> > > > >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > >
> > >
> >
> > > > > ... 6 more
> > > >
> > >
> >
> > > > > [/QUOTE]
> > > >
> > >
> >
>
> > > > > The JSON file is of the following nature, with a 2-level hierarchy
> for
> > > > > one
> > > > > field:
> > > >
> > >
> >
> > > > > [JSON]
> > > >
> > >
> >
> > > > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> > > > > platform:
> > > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> > > > > can-usage-v1
> > > > > event: General,Login,Success}}
> > > >
> > >
> >
> > > > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> > > > > platform:
> > > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> > > > > can-usage-v1
> > > > > event: General,App,Opened}}
> > > >
> > >
> >
> > > > > [/JSON]
> > > >
> > >
> >
>
> > > > > Thanks in advance for helping us to understand where we are going
> > > > > wrong.
> > > >
> > >
> >
>
> > > > > Anirvan
> > > >
> > >
> >
>

Re: Program crashes trying to read JSON file

Posted by Stephan Ewen <se...@apache.org>.
There must be something simple mixed up somewhere. The string output from
your previous mail suggests that the output is the name of the data reading
operator object. Not sure how you achieved that, but that is pretty mixed
up.

My gut feeling is that you call "toString()" on a DataSet somewhere and use
that as actual data. Something like

DataSet<String> text = env.fromElements(env.readTextFile(...).toString())

Reading JSON entries as strings (assuming they contain no linebreaks) and
extracting fields is one of the "hello world" examples, there should be no
strange surprises. I would have a look at examples like WordCount and check
where you use different types of opetations.

Btw: I would look into the Jackson library for json parsing and field
extraction. Very easy to use, good performance.
 Am 26.11.2014 09:01 schrieb "Anirvan BASU" <an...@inria.fr>:

> Ciao Stefano !
>
> Thanks for this early morning information, very helpful.
> Yes, for outputting the data we are using WriteAsCSV which is stable over
> different versions of Flink.
>
> Our current concern is "reading" a JSON file into a dataset.
> As you can see, we have a simple 2-level JSON hierarchy that can be easily
> mapped to a fixed-column CSV.
> But the place we are stuck at currently is in reading the file correctly
> into a tuple-based dataset in memory.
> Once this is achieved, the rest will be fairly simple dataset
> transformations.
>
> As you can see from the pasted code, we used functions developed from the
> stream connector for our purposes. (Thanks to Gyula and Marton for that
> information)
>
> If reading a JSON file using functions already developed is not possible
> then we will have to develop some custom functions on hardcore string
> operations to do the same.
> That would be like reinventing the wheel ... :-((
>
> Any advice in this regard will be highly appreciated.
>
> Thanks in advance to all,
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <s....@gmail.com>
> *To: *"user" <us...@flink.incubator.apache.org>
> *Sent: *Wednesday, November 26, 2014 8:37:59 AM
> *Subject: *Re: Program crashes trying to read JSON file
>
> You can output your results in different ways. If all you need is to write
> a file, I normally use the writeAsText method (however, there is the
> writeAsCSV, writeAsFormattedText. Of write according to your custom
> FileOutputFormat.
>
> datasetToPrint.writeAsText("/path/to/file/with/permission",
> WriteMode.OVERWRITE);
>
> Keep in mind that this will output your tuple dataset. Therefore, if you
> want to shape your output differently, It may be necessary to have further
> processing.
>
> saluti,
> Stefano
>
> 2014-11-25 22:04 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>
>> Thanks to Aljoscha and Stefano for pointing out the flaw.
>>
>> We corrected the issue as follows:
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.*Tuple4*;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         if(!parseParameters(args)) {
>>             return;
>>         }
>>
>>         // set up the execution environment
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>         // get input data
>>         DataSet<String> text = getTextDataSet(env);
>>
>>         DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
>>                 // split up the lines in pairs (4-tuples) containing:
>> (timestamp,uuid,event,*count*)
>>                 text.flatMap(new SelectDataFlatMap())
>>                 // group by the tuple field "1" (an event - string) and
>> sum up tuple field "3" (integer - value 1)
>>                 .*groupBy(1)*
>>                 .*sum(3*);
>>
>>
>>         // emit result
>>         if(fileOutput) {
>>             counts.writeAsCsv(outputPath, "\n", " ");
>>         } else {
>>             counts.print();
>>         }
>>
>>         // execute program
>>         env.execute("Weblogs Programme");
>>     }
>>
>> ...
>>
>>     public static class SelectDataFlatMap extends
>>     JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>>
>> {
>>
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public void flatMap(String value, Collector<Tuple4<Integer,
>> String, String, Integer>> record)
>>                 throws Exception {
>>             try {
>>                 record.collect(new Tuple4<Integer, String, String,
>> Integer>(
>>                         getInt(value, "timestamp"),
>>                         getString(value, "uuid"),
>>                         getString(value, "event"),
>>                         1));
>>             } catch (JSONException e) {
>>                 System.err.println("Field not found");
>>             }
>>         }
>>     }
>>
>>
>> [/CODE]
>>
>> However, this time the issue was different.
>> The programme executed correctly till status FINISHED.
>> However, there was no output :-((
>> i.e. For each Task Manager, an empty file is written.
>>
>> When we checked further about the input text file that is read using
>> env.readTextFile() we find that instead of a text string (full text
>> dataset) only a small string is written!
>> Something as :
>> org.apache.flink.api.java.operators.DataSource@6bd8b476
>>
>> Worse still ! this string value sometimes remains the same over multiple
>> runs of the programme ....
>> Is this natural ? Is this just the handle to the file or the dataset ?
>> Is the Collector() working correctly also ?
>>
>>
>> Note :
>> The actual JSON file (i.e. the text file that should be read) is of the
>> following nature, with a 2-level hierarchy for one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>> Younited     uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>  type: can-usage-v1     event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>> Younited     uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>  type: can-usage-v1     event: General,App,Opened}}
>> [/JSON]
>>
>>
>> So now again, we are confused if we are doing it correctly :-((
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>> Anirvan
>>
>> ------------------------------
>>
>> *From: *"Stefano Bortoli" <s....@gmail.com>
>> *To: *"user" <us...@flink.incubator.apache.org>
>> *Cc: *dev@flink.incubator.apache.org
>> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
>> *Subject: *Re: Program crashes trying to read JSON file
>>
>>
>> Very quickly, it seems you are trying to sum on Strings
>>
>> Caused by: org.apache.flink.api.java.
>> aggregation.UnsupportedAggregationTypeException: The type
>> java.lang.String has currently not supported for built-in sum aggregations.
>>
>> Check your tuple types and be sure that you are not summing on strings.
>>
>>
>> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>>
>>> Hello all,
>>>
>>> We are using Flink 0.7 and trying to read a large JSON file, reading
>>> some fields into a flink  (3-tuple based) dataset, then performing some
>>> operations.
>>>
>>> We encountered the following runtime error:
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>>
>>> The code snippet that could have caused this error (i.e. that we edited)
>>> is the following
>>>
>>> [CODE]
>>>
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.util.Collector;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>>> import org.apache.sling.commons.json.JSONException;
>>> ...
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         if(!parseParameters(args)) {
>>>             return;
>>>         }
>>>
>>>         // set up the execution environment
>>>         final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         // get input data
>>>         DataSet<String> text = getTextDataSet(env);
>>>
>>>         DataSet<Tuple3<Integer, String, String>> counts =
>>>                 // split up the lines in pairs (3-tuples) containing:
>>> (timestamp,uuid,event)
>>>                 text.flatMap(new *SelectDataFlatMap()*)
>>>                 // group by the tuple field "0" and sum up tuple field
>>> "1"
>>>                 .groupBy(2)
>>>                 .sum(2);
>>>
>>>         // emit result
>>>         if(fileOutput) {
>>>             counts.writeAsCsv(outputPath, "\n", " ");
>>>         } else {
>>>             counts.print();
>>>         }
>>>
>>>         // execute program
>>>         env.execute("Weblogs Programme");
>>>     }
>>>
>>> ...
>>>
>>>     public static class *SelectDataFlatMap* extends
>>>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>>
>>>         @Override
>>>         public void flatMap(String value, Collector<Tuple3<Integer,
>>> String, String>> out)
>>>                 throws Exception {
>>>             try {
>>>                 out.collect(new Tuple3<Integer, String, String>(
>>>                         getInt(value, "timestamp"),
>>>                         getString(value, "uuid"),
>>>                         getString(value, "event")));
>>>             } catch (JSONException e) {
>>>                 System.err.println("Field not found");
>>>             }
>>>         }
>>>     }
>>>
>>> [/CODE]
>>>
>>>
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>> The JSON file is of the following nature, with a 2-level hierarchy for
>>> one field:
>>> [JSON]
>>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>>> Younited     uuid:
>>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>>  type: can-usage-v1     event: General,Login,Success}}
>>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>>> Younited     uuid:
>>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>>  type: can-usage-v1     event: General,App,Opened}}
>>> [/JSON]
>>>
>>>
>>>
>>> Thanks in advance for helping us to understand where we are going wrong.
>>>
>>> Anirvan
>>>
>>
>>
>>
>
>

Re: Program crashes trying to read JSON file

Posted by Stephan Ewen <se...@apache.org>.
There must be something simple mixed up somewhere. The string output from
your previous mail suggests that the output is the name of the data reading
operator object. Not sure how you achieved that, but that is pretty mixed
up.

My gut feeling is that you call "toString()" on a DataSet somewhere and use
that as actual data. Something like

DataSet<String> text = env.fromElements(env.readTextFile(...).toString())

Reading JSON entries as strings (assuming they contain no linebreaks) and
extracting fields is one of the "hello world" examples, there should be no
strange surprises. I would have a look at examples like WordCount and check
where you use different types of opetations.

Btw: I would look into the Jackson library for json parsing and field
extraction. Very easy to use, good performance.
 Am 26.11.2014 09:01 schrieb "Anirvan BASU" <an...@inria.fr>:

> Ciao Stefano !
>
> Thanks for this early morning information, very helpful.
> Yes, for outputting the data we are using WriteAsCSV which is stable over
> different versions of Flink.
>
> Our current concern is "reading" a JSON file into a dataset.
> As you can see, we have a simple 2-level JSON hierarchy that can be easily
> mapped to a fixed-column CSV.
> But the place we are stuck at currently is in reading the file correctly
> into a tuple-based dataset in memory.
> Once this is achieved, the rest will be fairly simple dataset
> transformations.
>
> As you can see from the pasted code, we used functions developed from the
> stream connector for our purposes. (Thanks to Gyula and Marton for that
> information)
>
> If reading a JSON file using functions already developed is not possible
> then we will have to develop some custom functions on hardcore string
> operations to do the same.
> That would be like reinventing the wheel ... :-((
>
> Any advice in this regard will be highly appreciated.
>
> Thanks in advance to all,
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <s....@gmail.com>
> *To: *"user" <us...@flink.incubator.apache.org>
> *Sent: *Wednesday, November 26, 2014 8:37:59 AM
> *Subject: *Re: Program crashes trying to read JSON file
>
> You can output your results in different ways. If all you need is to write
> a file, I normally use the writeAsText method (however, there is the
> writeAsCSV, writeAsFormattedText. Of write according to your custom
> FileOutputFormat.
>
> datasetToPrint.writeAsText("/path/to/file/with/permission",
> WriteMode.OVERWRITE);
>
> Keep in mind that this will output your tuple dataset. Therefore, if you
> want to shape your output differently, It may be necessary to have further
> processing.
>
> saluti,
> Stefano
>
> 2014-11-25 22:04 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>
>> Thanks to Aljoscha and Stefano for pointing out the flaw.
>>
>> We corrected the issue as follows:
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.*Tuple4*;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         if(!parseParameters(args)) {
>>             return;
>>         }
>>
>>         // set up the execution environment
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>         // get input data
>>         DataSet<String> text = getTextDataSet(env);
>>
>>         DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
>>                 // split up the lines in pairs (4-tuples) containing:
>> (timestamp,uuid,event,*count*)
>>                 text.flatMap(new SelectDataFlatMap())
>>                 // group by the tuple field "1" (an event - string) and
>> sum up tuple field "3" (integer - value 1)
>>                 .*groupBy(1)*
>>                 .*sum(3*);
>>
>>
>>         // emit result
>>         if(fileOutput) {
>>             counts.writeAsCsv(outputPath, "\n", " ");
>>         } else {
>>             counts.print();
>>         }
>>
>>         // execute program
>>         env.execute("Weblogs Programme");
>>     }
>>
>> ...
>>
>>     public static class SelectDataFlatMap extends
>>     JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>>
>> {
>>
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public void flatMap(String value, Collector<Tuple4<Integer,
>> String, String, Integer>> record)
>>                 throws Exception {
>>             try {
>>                 record.collect(new Tuple4<Integer, String, String,
>> Integer>(
>>                         getInt(value, "timestamp"),
>>                         getString(value, "uuid"),
>>                         getString(value, "event"),
>>                         1));
>>             } catch (JSONException e) {
>>                 System.err.println("Field not found");
>>             }
>>         }
>>     }
>>
>>
>> [/CODE]
>>
>> However, this time the issue was different.
>> The programme executed correctly till status FINISHED.
>> However, there was no output :-((
>> i.e. For each Task Manager, an empty file is written.
>>
>> When we checked further about the input text file that is read using
>> env.readTextFile() we find that instead of a text string (full text
>> dataset) only a small string is written!
>> Something as :
>> org.apache.flink.api.java.operators.DataSource@6bd8b476
>>
>> Worse still ! this string value sometimes remains the same over multiple
>> runs of the programme ....
>> Is this natural ? Is this just the handle to the file or the dataset ?
>> Is the Collector() working correctly also ?
>>
>>
>> Note :
>> The actual JSON file (i.e. the text file that should be read) is of the
>> following nature, with a 2-level hierarchy for one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>> Younited     uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>  type: can-usage-v1     event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>> Younited     uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>  type: can-usage-v1     event: General,App,Opened}}
>> [/JSON]
>>
>>
>> So now again, we are confused if we are doing it correctly :-((
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>> Anirvan
>>
>> ------------------------------
>>
>> *From: *"Stefano Bortoli" <s....@gmail.com>
>> *To: *"user" <us...@flink.incubator.apache.org>
>> *Cc: *dev@flink.incubator.apache.org
>> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
>> *Subject: *Re: Program crashes trying to read JSON file
>>
>>
>> Very quickly, it seems you are trying to sum on Strings
>>
>> Caused by: org.apache.flink.api.java.
>> aggregation.UnsupportedAggregationTypeException: The type
>> java.lang.String has currently not supported for built-in sum aggregations.
>>
>> Check your tuple types and be sure that you are not summing on strings.
>>
>>
>> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>>
>>> Hello all,
>>>
>>> We are using Flink 0.7 and trying to read a large JSON file, reading
>>> some fields into a flink  (3-tuple based) dataset, then performing some
>>> operations.
>>>
>>> We encountered the following runtime error:
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>>
>>> The code snippet that could have caused this error (i.e. that we edited)
>>> is the following
>>>
>>> [CODE]
>>>
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.util.Collector;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>>> import org.apache.sling.commons.json.JSONException;
>>> ...
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         if(!parseParameters(args)) {
>>>             return;
>>>         }
>>>
>>>         // set up the execution environment
>>>         final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         // get input data
>>>         DataSet<String> text = getTextDataSet(env);
>>>
>>>         DataSet<Tuple3<Integer, String, String>> counts =
>>>                 // split up the lines in pairs (3-tuples) containing:
>>> (timestamp,uuid,event)
>>>                 text.flatMap(new *SelectDataFlatMap()*)
>>>                 // group by the tuple field "0" and sum up tuple field
>>> "1"
>>>                 .groupBy(2)
>>>                 .sum(2);
>>>
>>>         // emit result
>>>         if(fileOutput) {
>>>             counts.writeAsCsv(outputPath, "\n", " ");
>>>         } else {
>>>             counts.print();
>>>         }
>>>
>>>         // execute program
>>>         env.execute("Weblogs Programme");
>>>     }
>>>
>>> ...
>>>
>>>     public static class *SelectDataFlatMap* extends
>>>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>>
>>>         @Override
>>>         public void flatMap(String value, Collector<Tuple3<Integer,
>>> String, String>> out)
>>>                 throws Exception {
>>>             try {
>>>                 out.collect(new Tuple3<Integer, String, String>(
>>>                         getInt(value, "timestamp"),
>>>                         getString(value, "uuid"),
>>>                         getString(value, "event")));
>>>             } catch (JSONException e) {
>>>                 System.err.println("Field not found");
>>>             }
>>>         }
>>>     }
>>>
>>> [/CODE]
>>>
>>>
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>> The JSON file is of the following nature, with a 2-level hierarchy for
>>> one field:
>>> [JSON]
>>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>>> Younited     uuid:
>>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>>  type: can-usage-v1     event: General,Login,Success}}
>>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>>> Younited     uuid:
>>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>>  type: can-usage-v1     event: General,App,Opened}}
>>> [/JSON]
>>>
>>>
>>>
>>> Thanks in advance for helping us to understand where we are going wrong.
>>>
>>> Anirvan
>>>
>>
>>
>>
>
>

Re: Program crashes trying to read JSON file

Posted by Anirvan BASU <an...@inria.fr>.
Dear Fabian, 

Thanks for your meticulous analysis - awesome ! deutsche Qualität, selbstverstandlich !!! 

yes indeed, you are right about the String outputs that would be produced. 
I understand that a custom parse logic in a Map function is definitely necessary. 

I was trying to use the implementation used here : https://github.com/apache/incubator-flink/blob/release-0.7.0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java#L52-73 
It was suggested to me by Marton and Gyula, and I adapted it for my simple case of non-Streaming ... 

Question : would there be any difference in output, between the streaming and non-streaming cases, from the functions getInt() or getString() - these are the functions that actually do the extraction job, don't they ? 

However, if the above does not really work, then I will need to revert to some "hardcore" sub-string extraction specific to my case :-(( 

Thanks so much for your advice and support ! 
Anirvan 


----- Original Message -----


From: "Fabian Hueske" <fh...@apache.org> 
To: user@flink.incubator.apache.org 
Sent: Wednesday, November 26, 2014 9:34:57 AM 
Subject: Re: Program crashes trying to read JSON file 

Hi Anirvan, 

The CSVInputFormat works with two delimiters, a record delimiter and a field delimiter. The input data is split at the record delimiter into records. Each record is split along the field delimiters into several fields. The number and type of fields must be constant. There is not logic to handle nested data. 
Looking at your data, I assume that the newline ('\n') character is the record delimiter. However due to the nested structure of your data, I don't see a valid field delimiter. If you have this input: 

{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 

Splitting along the field delimiter tab ('\t') would result in 7 String fields: 

- {timestamp: 1397731764 
- payload: {product: Younited 
- uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd 
- platform: native 
- version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 
- type: can-usage-v1 
- event: General,Login,Success}} 

I guess this is not exactly what you want (curly braces, field names, etc.). 

The best way to handle JSON data right now is to have the records delimited by a character such as newline, read the data line-wise with the TextInputFormat, and use custom parse logic in a MapFunction. There you can also use JSON libraries (make sure they work thread-safe!). 

Best, Fabian 

2014-11-26 9:01 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > : 

<blockquote>

Ciao Stefano ! 

Thanks for this early morning information, very helpful. 
Yes, for outputting the data we are using WriteAsCSV which is stable over different versions of Flink. 

Our current concern is "reading" a JSON file into a dataset. 
As you can see, we have a simple 2-level JSON hierarchy that can be easily mapped to a fixed-column CSV. 
But the place we are stuck at currently is in reading the file correctly into a tuple-based dataset in memory. 
Once this is achieved, the rest will be fairly simple dataset transformations. 

As you can see from the pasted code, we used functions developed from the stream connector for our purposes. (Thanks to Gyula and Marton for that information) 

If reading a JSON file using functions already developed is not possible then we will have to develop some custom functions on hardcore string operations to do the same. 
That would be like reinventing the wheel ... :-(( 

Any advice in this regard will be highly appreciated. 

Thanks in advance to all, 
Anirvan 



<blockquote>
From: "Stefano Bortoli" < s.bortoli@gmail.com > 
To: "user" < user@flink.incubator.apache.org > 
Sent: Wednesday, November 26, 2014 8:37:59 AM 

Subject: Re: Program crashes trying to read JSON file 

You can output your results in different ways. If all you need is to write a file, I normally use the writeAsText method (however, there is the writeAsCSV, writeAsFormattedText. Of write according to your custom FileOutputFormat. 

datasetToPrint.writeAsText("/path/to/file/with/permission", WriteMode.OVERWRITE); 

Keep in mind that this will output your tuple dataset. Therefore, if you want to shape your output differently, It may be necessary to have further processing. 

saluti, 
Stefano 

2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > : 

<blockquote>

Thanks to Aljoscha and Stefano for pointing out the flaw. 

We corrected the issue as follows: 

[CODE] 

import org.apache.flink.api.java.tuple. Tuple4 ; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple4<Integer, String, String, Integer >> counts = 
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event, count ) 
text.flatMap(new SelectDataFlatMap()) 
// group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1) 
. groupBy(1) 
. sum(3 ); 


// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { 

private static final long serialVersionUID = 1L; 

@Override 
public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record) 
throws Exception { 
try { 
record.collect(new Tuple4<Integer, String, String, Integer>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"), 
1)); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 


[/CODE] 

However, this time the issue was different. 
The programme executed correctly till status FINISHED. 
However, there was no output :-(( 
i.e. For each Task Manager, an empty file is written. 

When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written! 
Something as : 
org.apache.flink.api.java.operators.DataSource@6bd8b476 

Worse still ! this string value sometimes remains the same over multiple runs of the programme .... 
Is this natural ? Is this just the handle to the file or the dataset ? 
Is the Collector() working correctly also ? 


Note : 
The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} 
[/JSON] 


So now again, we are confused if we are doing it correctly :-(( 

Thanks in advance for helping us to understand where we are going wrong. 
Anirvan 



<blockquote>
From: "Stefano Bortoli" < s.bortoli@gmail.com > 
To: "user" < user@flink.incubator.apache.org > 
Cc: dev@flink.incubator.apache.org 
Sent: Tuesday, November 25, 2014 5:05:34 PM 
Subject: Re: Program crashes trying to read JSON file 


Very quickly, it seems you are trying to sum on Strings 

Caused by: org.apache.flink.api.java. 
aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. 

Check your tuple types and be sure that you are not summing on strings. 


2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > : 

<blockquote>

Hello all, 

We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink (3-tuple based) dataset, then performing some operations. 

We encountered the following runtime error: 

[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) 
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) 
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) 
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) 
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) 
... 6 more 
[/QUOTE] 



The code snippet that could have caused this error (i.e. that we edited) is the following 

[CODE] 

import org.apache.flink.api.java.tuple.Tuple3; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple3<Integer, String, String>> counts = 
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event) 
text.flatMap(new SelectDataFlatMap() ) 
// group by the tuple field "0" and sum up tuple field "1" 
.groupBy(2) 
.sum(2); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple3<Integer, String, String>> { 

@Override 
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out) 
throws Exception { 
try { 
out.collect(new Tuple3<Integer, String, String>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"))); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 



[QUOTE] 
Error: The main method caused an error. 
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) 
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) 
at org.apache.flink.client.program.Client.run(Client.java:244) 
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) 
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) 
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) 
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) 
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations. 
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) 
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) 
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) 
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) 
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) 
... 6 more 
[/QUOTE] 


The JSON file is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} 
[/JSON] 



Thanks in advance for helping us to understand where we are going wrong. 

Anirvan 





</blockquote>



</blockquote>



</blockquote>



</blockquote>



</blockquote>



Re: Program crashes trying to read JSON file

Posted by Fabian Hueske <fh...@apache.org>.
Hi Anirvan,

The CSVInputFormat works with two delimiters, a record delimiter and a
field delimiter. The input data is split at the record delimiter into
records. Each record is split along the field delimiters into several
fields. The number and type of fields must be constant. There is not logic
to handle nested data.
Looking at your data, I assume that the newline ('\n') character is the
record delimiter. However due to the nested structure of your data, I don't
see a valid field delimiter. If you have this input:

{timestamp: 1397731764 <callto:1397731764>     payload: {product: Younited
    uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
    platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
    type: can-usage-v1     event: General,Login,Success}}

Splitting along the field delimiter tab ('\t') would result
in 7 String fields:

- {timestamp: 1397731764 <callto:1397731764>
- payload: {product: Younited
- uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
- platform: native
- version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
- type: can-usage-v1
- event: General,Login,Success}}

I guess this is not exactly what you want (curly braces, field names, etc.).

The best way to handle JSON data right now is to have the records delimited
by a character such as newline, read the data line-wise with the
TextInputFormat, and use custom parse logic in a MapFunction. There you can
also use JSON libraries (make sure they work thread-safe!).

Best, Fabian

2014-11-26 9:01 GMT+01:00 Anirvan BASU <an...@inria.fr>:

> Ciao Stefano !
>
> Thanks for this early morning information, very helpful.
> Yes, for outputting the data we are using WriteAsCSV which is stable over
> different versions of Flink.
>
> Our current concern is "reading" a JSON file into a dataset.
> As you can see, we have a simple 2-level JSON hierarchy that can be easily
> mapped to a fixed-column CSV.
> But the place we are stuck at currently is in reading the file correctly
> into a tuple-based dataset in memory.
> Once this is achieved, the rest will be fairly simple dataset
> transformations.
>
> As you can see from the pasted code, we used functions developed from the
> stream connector for our purposes. (Thanks to Gyula and Marton for that
> information)
>
> If reading a JSON file using functions already developed is not possible
> then we will have to develop some custom functions on hardcore string
> operations to do the same.
> That would be like reinventing the wheel ... :-((
>
> Any advice in this regard will be highly appreciated.
>
> Thanks in advance to all,
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <s....@gmail.com>
> *To: *"user" <us...@flink.incubator.apache.org>
> *Sent: *Wednesday, November 26, 2014 8:37:59 AM
>
> *Subject: *Re: Program crashes trying to read JSON file
>
> You can output your results in different ways. If all you need is to write
> a file, I normally use the writeAsText method (however, there is the
> writeAsCSV, writeAsFormattedText. Of write according to your custom
> FileOutputFormat.
>
> datasetToPrint.writeAsText("/path/to/file/with/permission",
> WriteMode.OVERWRITE);
>
> Keep in mind that this will output your tuple dataset. Therefore, if you
> want to shape your output differently, It may be necessary to have further
> processing.
>
> saluti,
> Stefano
>
> 2014-11-25 22:04 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>
>> Thanks to Aljoscha and Stefano for pointing out the flaw.
>>
>> We corrected the issue as follows:
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.*Tuple4*;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         if(!parseParameters(args)) {
>>             return;
>>         }
>>
>>         // set up the execution environment
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>         // get input data
>>         DataSet<String> text = getTextDataSet(env);
>>
>>         DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
>>                 // split up the lines in pairs (4-tuples) containing:
>> (timestamp,uuid,event,*count*)
>>                 text.flatMap(new SelectDataFlatMap())
>>                 // group by the tuple field "1" (an event - string) and
>> sum up tuple field "3" (integer - value 1)
>>                 .*groupBy(1)*
>>                 .*sum(3*);
>>
>>
>>         // emit result
>>         if(fileOutput) {
>>             counts.writeAsCsv(outputPath, "\n", " ");
>>         } else {
>>             counts.print();
>>         }
>>
>>         // execute program
>>         env.execute("Weblogs Programme");
>>     }
>>
>> ...
>>
>>     public static class SelectDataFlatMap extends
>>     JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>>
>> {
>>
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public void flatMap(String value, Collector<Tuple4<Integer,
>> String, String, Integer>> record)
>>                 throws Exception {
>>             try {
>>                 record.collect(new Tuple4<Integer, String, String,
>> Integer>(
>>                         getInt(value, "timestamp"),
>>                         getString(value, "uuid"),
>>                         getString(value, "event"),
>>                         1));
>>             } catch (JSONException e) {
>>                 System.err.println("Field not found");
>>             }
>>         }
>>     }
>>
>>
>> [/CODE]
>>
>> However, this time the issue was different.
>> The programme executed correctly till status FINISHED.
>> However, there was no output :-((
>> i.e. For each Task Manager, an empty file is written.
>>
>> When we checked further about the input text file that is read using
>> env.readTextFile() we find that instead of a text string (full text
>> dataset) only a small string is written!
>> Something as :
>> org.apache.flink.api.java.operators.DataSource@6bd8b476
>>
>> Worse still ! this string value sometimes remains the same over multiple
>> runs of the programme ....
>> Is this natural ? Is this just the handle to the file or the dataset ?
>> Is the Collector() working correctly also ?
>>
>>
>> Note :
>> The actual JSON file (i.e. the text file that should be read) is of the
>> following nature, with a 2-level hierarchy for one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>> Younited     uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>  type: can-usage-v1     event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>> Younited     uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>  type: can-usage-v1     event: General,App,Opened}}
>> [/JSON]
>>
>>
>> So now again, we are confused if we are doing it correctly :-((
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>> Anirvan
>>
>> ------------------------------
>>
>> *From: *"Stefano Bortoli" <s....@gmail.com>
>> *To: *"user" <us...@flink.incubator.apache.org>
>> *Cc: *dev@flink.incubator.apache.org
>> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
>> *Subject: *Re: Program crashes trying to read JSON file
>>
>>
>> Very quickly, it seems you are trying to sum on Strings
>>
>> Caused by: org.apache.flink.api.java.
>> aggregation.UnsupportedAggregationTypeException: The type
>> java.lang.String has currently not supported for built-in sum aggregations.
>>
>> Check your tuple types and be sure that you are not summing on strings.
>>
>>
>> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>>
>>> Hello all,
>>>
>>> We are using Flink 0.7 and trying to read a large JSON file, reading
>>> some fields into a flink  (3-tuple based) dataset, then performing some
>>> operations.
>>>
>>> We encountered the following runtime error:
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>>
>>> The code snippet that could have caused this error (i.e. that we edited)
>>> is the following
>>>
>>> [CODE]
>>>
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.util.Collector;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>>> import org.apache.sling.commons.json.JSONException;
>>> ...
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         if(!parseParameters(args)) {
>>>             return;
>>>         }
>>>
>>>         // set up the execution environment
>>>         final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         // get input data
>>>         DataSet<String> text = getTextDataSet(env);
>>>
>>>         DataSet<Tuple3<Integer, String, String>> counts =
>>>                 // split up the lines in pairs (3-tuples) containing:
>>> (timestamp,uuid,event)
>>>                 text.flatMap(new *SelectDataFlatMap()*)
>>>                 // group by the tuple field "0" and sum up tuple field
>>> "1"
>>>                 .groupBy(2)
>>>                 .sum(2);
>>>
>>>         // emit result
>>>         if(fileOutput) {
>>>             counts.writeAsCsv(outputPath, "\n", " ");
>>>         } else {
>>>             counts.print();
>>>         }
>>>
>>>         // execute program
>>>         env.execute("Weblogs Programme");
>>>     }
>>>
>>> ...
>>>
>>>     public static class *SelectDataFlatMap* extends
>>>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>>
>>>         @Override
>>>         public void flatMap(String value, Collector<Tuple3<Integer,
>>> String, String>> out)
>>>                 throws Exception {
>>>             try {
>>>                 out.collect(new Tuple3<Integer, String, String>(
>>>                         getInt(value, "timestamp"),
>>>                         getString(value, "uuid"),
>>>                         getString(value, "event")));
>>>             } catch (JSONException e) {
>>>                 System.err.println("Field not found");
>>>             }
>>>         }
>>>     }
>>>
>>> [/CODE]
>>>
>>>
>>>
>>> [QUOTE]
>>> Error: The main method caused an error.
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>>     at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>>     at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>>> Caused by:
>>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>>> The type java.lang.String has currently not supported for built-in sum
>>> aggregations.
>>>     at
>>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>>     at
>>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>>     at
>>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>>     at
>>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>     at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>>     ... 6 more
>>> [/QUOTE]
>>>
>>>
>>> The JSON file is of the following nature, with a 2-level hierarchy for
>>> one field:
>>> [JSON]
>>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>>> Younited     uuid:
>>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>>  type: can-usage-v1     event: General,Login,Success}}
>>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>>> Younited     uuid:
>>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>>  type: can-usage-v1     event: General,App,Opened}}
>>> [/JSON]
>>>
>>>
>>>
>>> Thanks in advance for helping us to understand where we are going wrong.
>>>
>>> Anirvan
>>>
>>
>>
>>
>
>

Re: Program crashes trying to read JSON file

Posted by Anirvan BASU <an...@inria.fr>.
Ciao Stefano ! 

Thanks for this early morning information, very helpful. 
Yes, for outputting the data we are using WriteAsCSV which is stable over different versions of Flink. 

Our current concern is "reading" a JSON file into a dataset. 
As you can see, we have a simple 2-level JSON hierarchy that can be easily mapped to a fixed-column CSV. 
But the place we are stuck at currently is in reading the file correctly into a tuple-based dataset in memory. 
Once this is achieved, the rest will be fairly simple dataset transformations. 

As you can see from the pasted code, we used functions developed from the stream connector for our purposes. (Thanks to Gyula and Marton for that information) 

If reading a JSON file using functions already developed is not possible then we will have to develop some custom functions on hardcore string operations to do the same. 
That would be like reinventing the wheel ... :-(( 

Any advice in this regard will be highly appreciated. 

Thanks in advance to all, 
Anirvan 

----- Original Message -----

> From: "Stefano Bortoli" <s....@gmail.com>
> To: "user" <us...@flink.incubator.apache.org>
> Sent: Wednesday, November 26, 2014 8:37:59 AM
> Subject: Re: Program crashes trying to read JSON file

> You can output your results in different ways. If all you need is to write a
> file, I normally use the writeAsText method (however, there is the
> writeAsCSV, writeAsFormattedText. Of write according to your custom
> FileOutputFormat.

> datasetToPrint.writeAsText("/path/to/file/with/permission",
> WriteMode.OVERWRITE);

> Keep in mind that this will output your tuple dataset. Therefore, if you want
> to shape your output differently, It may be necessary to have further
> processing.

> saluti,
> Stefano

> 2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :

> > Thanks to Aljoscha and Stefano for pointing out the flaw.
> 

> > We corrected the issue as follows:
> 

> > [CODE]
> 

> > import org.apache.flink.api.java.tuple. Tuple4 ;
> 
> > import org.apache.flink.util.Collector;
> 
> > import org.apache.flink.api.java.DataSet;
> 
> > import org.apache.flink.api.java.ExecutionEnvironment;
> 
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> 
> > import org.apache.sling.commons.json.JSONException;
> 
> > ...
> 

> > public static void main(String[] args) throws Exception {
> 

> > if(!parseParameters(args)) {
> 
> > return;
> 
> > }
> 

> > // set up the execution environment
> 
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> 

> > // get input data
> 
> > DataSet<String> text = getTextDataSet(env);
> 

> > DataSet<Tuple4<Integer, String, String, Integer >> counts =
> 
> > // split up the lines in pairs (4-tuples) containing:
> > (timestamp,uuid,event,
> > count )
> 
> > text.flatMap(new SelectDataFlatMap())
> 
> > // group by the tuple field "1" (an event - string) and sum up tuple field
> > "3" (integer - value 1)
> 
> > . groupBy(1)
> 
> > . sum(3 );
> 

> > // emit result
> 
> > if(fileOutput) {
> 
> > counts.writeAsCsv(outputPath, "\n", " ");
> 
> > } else {
> 
> > counts.print();
> 
> > }
> 

> > // execute program
> 
> > env.execute("Weblogs Programme");
> 
> > }
> 

> > ...
> 

> > public static class SelectDataFlatMap extends
> 
> > JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> {
> 

> > private static final long serialVersionUID = 1L;
> 

> > @Override
> 
> > public void flatMap(String value, Collector<Tuple4<Integer, String, String,
> > Integer>> record)
> 
> > throws Exception {
> 
> > try {
> 
> > record.collect(new Tuple4<Integer, String, String, Integer>(
> 
> > getInt(value, "timestamp"),
> 
> > getString(value, "uuid"),
> 
> > getString(value, "event"),
> 
> > 1));
> 
> > } catch (JSONException e) {
> 
> > System.err.println("Field not found");
> 
> > }
> 
> > }
> 
> > }
> 

> > [/CODE]
> 

> > However, this time the issue was different.
> 
> > The programme executed correctly till status FINISHED.
> 
> > However, there was no output :-((
> 
> > i.e. For each Task Manager, an empty file is written.
> 

> > When we checked further about the input text file that is read using
> > env.readTextFile() we find that instead of a text string (full text
> > dataset)
> > only a small string is written!
> 
> > Something as :
> 
> > org.apache.flink.api.java.operators.DataSource@6bd8b476
> 

> > Worse still ! this string value sometimes remains the same over multiple
> > runs
> > of the programme ....
> 
> > Is this natural ? Is this just the handle to the file or the dataset ?
> 
> > Is the Collector() working correctly also ?
> 

> > Note :
> 
> > The actual JSON file (i.e. the text file that should be read) is of the
> > following nature, with a 2-level hierarchy for one field:
> 
> > [JSON]
> 
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
> 
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
> 
> > [/JSON]
> 

> > So now again, we are confused if we are doing it correctly :-((
> 

> > Thanks in advance for helping us to understand where we are going wrong.
> 
> > Anirvan
> 

> > > From: "Stefano Bortoli" < s.bortoli@gmail.com >
> > 
> 
> > > To: "user" < user@flink.incubator.apache.org >
> > 
> 
> > > Cc: dev@flink.incubator.apache.org
> > 
> 
> > > Sent: Tuesday, November 25, 2014 5:05:34 PM
> > 
> 
> > > Subject: Re: Program crashes trying to read JSON file
> > 
> 

> > > Very quickly, it seems you are trying to sum on Strings
> > 
> 

> > > Caused by: org.apache.flink.api.java.
> > 
> 
> > > aggregation.UnsupportedAggregationTypeException: The type
> > > java.lang.String
> > > has currently not supported for built-in sum aggregations.
> > 
> 

> > > Check your tuple types and be sure that you are not summing on strings.
> > 
> 

> > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :
> > 
> 

> > > > Hello all,
> > > 
> > 
> 

> > > > We are using Flink 0.7 and trying to read a large JSON file, reading
> > > > some
> > > > fields into a flink (3-tuple based) dataset, then performing some
> > > > operations.
> > > 
> > 
> 

> > > > We encountered the following runtime error:
> > > 
> > 
> 

> > > > [QUOTE]
> > > 
> > 
> 
> > > > Error: The main method caused an error.
> > > 
> > 
> 
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method
> > > > caused an error.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > 
> > 
> 
> > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > 
> > 
> 
> > > > Caused by:
> > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > The type java.lang.String has currently not supported for built-in sum
> > > > aggregations.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > 
> > 
> 
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > 
> > 
> 
> > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > 
> > 
> 
> > > > ... 6 more
> > > 
> > 
> 
> > > > [/QUOTE]
> > > 
> > 
> 

> > > > The code snippet that could have caused this error (i.e. that we
> > > > edited)
> > > > is
> > > > the following
> > > 
> > 
> 

> > > > [CODE]
> > > 
> > 
> 

> > > > import org.apache.flink.api.java.tuple.Tuple3;
> > > 
> > 
> 
> > > > import org.apache.flink.util.Collector;
> > > 
> > 
> 
> > > > import org.apache.flink.api.java.DataSet;
> > > 
> > 
> 
> > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > 
> > 
> 
> > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> > > 
> > 
> 
> > > > import org.apache.sling.commons.json.JSONException;
> > > 
> > 
> 
> > > > ...
> > > 
> > 
> 

> > > > public static void main(String[] args) throws Exception {
> > > 
> > 
> 

> > > > if(!parseParameters(args)) {
> > > 
> > 
> 
> > > > return;
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > // set up the execution environment
> > > 
> > 
> 
> > > > final ExecutionEnvironment env =
> > > > ExecutionEnvironment.getExecutionEnvironment();
> > > 
> > 
> 

> > > > // get input data
> > > 
> > 
> 
> > > > DataSet<String> text = getTextDataSet(env);
> > > 
> > 
> 

> > > > DataSet<Tuple3<Integer, String, String>> counts =
> > > 
> > 
> 
> > > > // split up the lines in pairs (3-tuples) containing:
> > > > (timestamp,uuid,event)
> > > 
> > 
> 
> > > > text.flatMap(new SelectDataFlatMap() )
> > > 
> > 
> 
> > > > // group by the tuple field "0" and sum up tuple field "1"
> > > 
> > 
> 
> > > > .groupBy(2)
> > > 
> > 
> 
> > > > .sum(2);
> > > 
> > 
> 

> > > > // emit result
> > > 
> > 
> 
> > > > if(fileOutput) {
> > > 
> > 
> 
> > > > counts.writeAsCsv(outputPath, "\n", " ");
> > > 
> > 
> 
> > > > } else {
> > > 
> > 
> 
> > > > counts.print();
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > // execute program
> > > 
> > 
> 
> > > > env.execute("Weblogs Programme");
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > ...
> > > 
> > 
> 

> > > > public static class SelectDataFlatMap extends
> > > 
> > 
> 
> > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> > > 
> > 
> 

> > > > @Override
> > > 
> > 
> 
> > > > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > > > String>>
> > > > out)
> > > 
> > 
> 
> > > > throws Exception {
> > > 
> > 
> 
> > > > try {
> > > 
> > 
> 
> > > > out.collect(new Tuple3<Integer, String, String>(
> > > 
> > 
> 
> > > > getInt(value, "timestamp"),
> > > 
> > 
> 
> > > > getString(value, "uuid"),
> > > 
> > 
> 
> > > > getString(value, "event")));
> > > 
> > 
> 
> > > > } catch (JSONException e) {
> > > 
> > 
> 
> > > > System.err.println("Field not found");
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > [/CODE]
> > > 
> > 
> 

> > > > [QUOTE]
> > > 
> > 
> 
> > > > Error: The main method caused an error.
> > > 
> > 
> 
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method
> > > > caused an error.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > 
> > 
> 
> > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > 
> > 
> 
> > > > Caused by:
> > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > The type java.lang.String has currently not supported for built-in sum
> > > > aggregations.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > 
> > 
> 
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > 
> > 
> 
> > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > 
> > 
> 
> > > > ... 6 more
> > > 
> > 
> 
> > > > [/QUOTE]
> > > 
> > 
> 

> > > > The JSON file is of the following nature, with a 2-level hierarchy for
> > > > one
> > > > field:
> > > 
> > 
> 
> > > > [JSON]
> > > 
> > 
> 
> > > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> > > > platform:
> > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> > > > can-usage-v1
> > > > event: General,Login,Success}}
> > > 
> > 
> 
> > > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> > > > platform:
> > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> > > > can-usage-v1
> > > > event: General,App,Opened}}
> > > 
> > 
> 
> > > > [/JSON]
> > > 
> > 
> 

> > > > Thanks in advance for helping us to understand where we are going
> > > > wrong.
> > > 
> > 
> 

> > > > Anirvan
> > > 
> > 
> 

Re: Program crashes trying to read JSON file

Posted by Anirvan BASU <an...@inria.fr>.
Ciao Stefano ! 

Thanks for this early morning information, very helpful. 
Yes, for outputting the data we are using WriteAsCSV which is stable over different versions of Flink. 

Our current concern is "reading" a JSON file into a dataset. 
As you can see, we have a simple 2-level JSON hierarchy that can be easily mapped to a fixed-column CSV. 
But the place we are stuck at currently is in reading the file correctly into a tuple-based dataset in memory. 
Once this is achieved, the rest will be fairly simple dataset transformations. 

As you can see from the pasted code, we used functions developed from the stream connector for our purposes. (Thanks to Gyula and Marton for that information) 

If reading a JSON file using functions already developed is not possible then we will have to develop some custom functions on hardcore string operations to do the same. 
That would be like reinventing the wheel ... :-(( 

Any advice in this regard will be highly appreciated. 

Thanks in advance to all, 
Anirvan 

----- Original Message -----

> From: "Stefano Bortoli" <s....@gmail.com>
> To: "user" <us...@flink.incubator.apache.org>
> Sent: Wednesday, November 26, 2014 8:37:59 AM
> Subject: Re: Program crashes trying to read JSON file

> You can output your results in different ways. If all you need is to write a
> file, I normally use the writeAsText method (however, there is the
> writeAsCSV, writeAsFormattedText. Of write according to your custom
> FileOutputFormat.

> datasetToPrint.writeAsText("/path/to/file/with/permission",
> WriteMode.OVERWRITE);

> Keep in mind that this will output your tuple dataset. Therefore, if you want
> to shape your output differently, It may be necessary to have further
> processing.

> saluti,
> Stefano

> 2014-11-25 22:04 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :

> > Thanks to Aljoscha and Stefano for pointing out the flaw.
> 

> > We corrected the issue as follows:
> 

> > [CODE]
> 

> > import org.apache.flink.api.java.tuple. Tuple4 ;
> 
> > import org.apache.flink.util.Collector;
> 
> > import org.apache.flink.api.java.DataSet;
> 
> > import org.apache.flink.api.java.ExecutionEnvironment;
> 
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> 
> > import org.apache.sling.commons.json.JSONException;
> 
> > ...
> 

> > public static void main(String[] args) throws Exception {
> 

> > if(!parseParameters(args)) {
> 
> > return;
> 
> > }
> 

> > // set up the execution environment
> 
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> 

> > // get input data
> 
> > DataSet<String> text = getTextDataSet(env);
> 

> > DataSet<Tuple4<Integer, String, String, Integer >> counts =
> 
> > // split up the lines in pairs (4-tuples) containing:
> > (timestamp,uuid,event,
> > count )
> 
> > text.flatMap(new SelectDataFlatMap())
> 
> > // group by the tuple field "1" (an event - string) and sum up tuple field
> > "3" (integer - value 1)
> 
> > . groupBy(1)
> 
> > . sum(3 );
> 

> > // emit result
> 
> > if(fileOutput) {
> 
> > counts.writeAsCsv(outputPath, "\n", " ");
> 
> > } else {
> 
> > counts.print();
> 
> > }
> 

> > // execute program
> 
> > env.execute("Weblogs Programme");
> 
> > }
> 

> > ...
> 

> > public static class SelectDataFlatMap extends
> 
> > JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> {
> 

> > private static final long serialVersionUID = 1L;
> 

> > @Override
> 
> > public void flatMap(String value, Collector<Tuple4<Integer, String, String,
> > Integer>> record)
> 
> > throws Exception {
> 
> > try {
> 
> > record.collect(new Tuple4<Integer, String, String, Integer>(
> 
> > getInt(value, "timestamp"),
> 
> > getString(value, "uuid"),
> 
> > getString(value, "event"),
> 
> > 1));
> 
> > } catch (JSONException e) {
> 
> > System.err.println("Field not found");
> 
> > }
> 
> > }
> 
> > }
> 

> > [/CODE]
> 

> > However, this time the issue was different.
> 
> > The programme executed correctly till status FINISHED.
> 
> > However, there was no output :-((
> 
> > i.e. For each Task Manager, an empty file is written.
> 

> > When we checked further about the input text file that is read using
> > env.readTextFile() we find that instead of a text string (full text
> > dataset)
> > only a small string is written!
> 
> > Something as :
> 
> > org.apache.flink.api.java.operators.DataSource@6bd8b476
> 

> > Worse still ! this string value sometimes remains the same over multiple
> > runs
> > of the programme ....
> 
> > Is this natural ? Is this just the handle to the file or the dataset ?
> 
> > Is the Collector() working correctly also ?
> 

> > Note :
> 
> > The actual JSON file (i.e. the text file that should be read) is of the
> > following nature, with a 2-level hierarchy for one field:
> 
> > [JSON]
> 
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
> 
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
> 
> > [/JSON]
> 

> > So now again, we are confused if we are doing it correctly :-((
> 

> > Thanks in advance for helping us to understand where we are going wrong.
> 
> > Anirvan
> 

> > > From: "Stefano Bortoli" < s.bortoli@gmail.com >
> > 
> 
> > > To: "user" < user@flink.incubator.apache.org >
> > 
> 
> > > Cc: dev@flink.incubator.apache.org
> > 
> 
> > > Sent: Tuesday, November 25, 2014 5:05:34 PM
> > 
> 
> > > Subject: Re: Program crashes trying to read JSON file
> > 
> 

> > > Very quickly, it seems you are trying to sum on Strings
> > 
> 

> > > Caused by: org.apache.flink.api.java.
> > 
> 
> > > aggregation.UnsupportedAggregationTypeException: The type
> > > java.lang.String
> > > has currently not supported for built-in sum aggregations.
> > 
> 

> > > Check your tuple types and be sure that you are not summing on strings.
> > 
> 

> > > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :
> > 
> 

> > > > Hello all,
> > > 
> > 
> 

> > > > We are using Flink 0.7 and trying to read a large JSON file, reading
> > > > some
> > > > fields into a flink (3-tuple based) dataset, then performing some
> > > > operations.
> > > 
> > 
> 

> > > > We encountered the following runtime error:
> > > 
> > 
> 

> > > > [QUOTE]
> > > 
> > 
> 
> > > > Error: The main method caused an error.
> > > 
> > 
> 
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method
> > > > caused an error.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > 
> > 
> 
> > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > 
> > 
> 
> > > > Caused by:
> > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > The type java.lang.String has currently not supported for built-in sum
> > > > aggregations.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > 
> > 
> 
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > 
> > 
> 
> > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > 
> > 
> 
> > > > ... 6 more
> > > 
> > 
> 
> > > > [/QUOTE]
> > > 
> > 
> 

> > > > The code snippet that could have caused this error (i.e. that we
> > > > edited)
> > > > is
> > > > the following
> > > 
> > 
> 

> > > > [CODE]
> > > 
> > 
> 

> > > > import org.apache.flink.api.java.tuple.Tuple3;
> > > 
> > 
> 
> > > > import org.apache.flink.util.Collector;
> > > 
> > 
> 
> > > > import org.apache.flink.api.java.DataSet;
> > > 
> > 
> 
> > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > 
> > 
> 
> > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> > > 
> > 
> 
> > > > import org.apache.sling.commons.json.JSONException;
> > > 
> > 
> 
> > > > ...
> > > 
> > 
> 

> > > > public static void main(String[] args) throws Exception {
> > > 
> > 
> 

> > > > if(!parseParameters(args)) {
> > > 
> > 
> 
> > > > return;
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > // set up the execution environment
> > > 
> > 
> 
> > > > final ExecutionEnvironment env =
> > > > ExecutionEnvironment.getExecutionEnvironment();
> > > 
> > 
> 

> > > > // get input data
> > > 
> > 
> 
> > > > DataSet<String> text = getTextDataSet(env);
> > > 
> > 
> 

> > > > DataSet<Tuple3<Integer, String, String>> counts =
> > > 
> > 
> 
> > > > // split up the lines in pairs (3-tuples) containing:
> > > > (timestamp,uuid,event)
> > > 
> > 
> 
> > > > text.flatMap(new SelectDataFlatMap() )
> > > 
> > 
> 
> > > > // group by the tuple field "0" and sum up tuple field "1"
> > > 
> > 
> 
> > > > .groupBy(2)
> > > 
> > 
> 
> > > > .sum(2);
> > > 
> > 
> 

> > > > // emit result
> > > 
> > 
> 
> > > > if(fileOutput) {
> > > 
> > 
> 
> > > > counts.writeAsCsv(outputPath, "\n", " ");
> > > 
> > 
> 
> > > > } else {
> > > 
> > 
> 
> > > > counts.print();
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > // execute program
> > > 
> > 
> 
> > > > env.execute("Weblogs Programme");
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > ...
> > > 
> > 
> 

> > > > public static class SelectDataFlatMap extends
> > > 
> > 
> 
> > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> > > 
> > 
> 

> > > > @Override
> > > 
> > 
> 
> > > > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > > > String>>
> > > > out)
> > > 
> > 
> 
> > > > throws Exception {
> > > 
> > 
> 
> > > > try {
> > > 
> > 
> 
> > > > out.collect(new Tuple3<Integer, String, String>(
> > > 
> > 
> 
> > > > getInt(value, "timestamp"),
> > > 
> > 
> 
> > > > getString(value, "uuid"),
> > > 
> > 
> 
> > > > getString(value, "event")));
> > > 
> > 
> 
> > > > } catch (JSONException e) {
> > > 
> > 
> 
> > > > System.err.println("Field not found");
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 
> > > > }
> > > 
> > 
> 

> > > > [/CODE]
> > > 
> > 
> 

> > > > [QUOTE]
> > > 
> > 
> 
> > > > Error: The main method caused an error.
> > > 
> > 
> 
> > > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > > method
> > > > caused an error.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> > > 
> > 
> 
> > > > at org.apache.flink.client.program.Client.run(Client.java:244)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> > > 
> > 
> 
> > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> > > 
> > 
> 
> > > > Caused by:
> > > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > > > The type java.lang.String has currently not supported for built-in sum
> > > > aggregations.
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> > > 
> > 
> 
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > 
> > 
> 
> > > > at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > 
> > 
> 
> > > > at java.lang.reflect.Method.invoke(Method.java:606)
> > > 
> > 
> 
> > > > at
> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> > > 
> > 
> 
> > > > ... 6 more
> > > 
> > 
> 
> > > > [/QUOTE]
> > > 
> > 
> 

> > > > The JSON file is of the following nature, with a 2-level hierarchy for
> > > > one
> > > > field:
> > > 
> > 
> 
> > > > [JSON]
> > > 
> > 
> 
> > > > {timestamp: 1397731764 payload: {product: Younited uuid:
> > > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> > > > platform:
> > > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type:
> > > > can-usage-v1
> > > > event: General,Login,Success}}
> > > 
> > 
> 
> > > > {timestamp: 1397731765 payload: {product: Younited uuid:
> > > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> > > > platform:
> > > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type:
> > > > can-usage-v1
> > > > event: General,App,Opened}}
> > > 
> > 
> 
> > > > [/JSON]
> > > 
> > 
> 

> > > > Thanks in advance for helping us to understand where we are going
> > > > wrong.
> > > 
> > 
> 

> > > > Anirvan
> > > 
> > 
> 

Re: Program crashes trying to read JSON file

Posted by Stefano Bortoli <s....@gmail.com>.
You can output your results in different ways. If all you need is to write
a file, I normally use the writeAsText method (however, there is the
writeAsCSV, writeAsFormattedText. Of write according to your custom
FileOutputFormat.

datasetToPrint.writeAsText("/path/to/file/with/permission",
WriteMode.OVERWRITE);

Keep in mind that this will output your tuple dataset. Therefore, if you
want to shape your output differently, It may be necessary to have further
processing.

saluti,
Stefano

2014-11-25 22:04 GMT+01:00 Anirvan BASU <an...@inria.fr>:

> Thanks to Aljoscha and Stefano for pointing out the flaw.
>
> We corrected the issue as follows:
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.*Tuple4*;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
>                 // split up the lines in pairs (4-tuples) containing:
> (timestamp,uuid,event,*count*)
>                 text.flatMap(new SelectDataFlatMap())
>                 // group by the tuple field "1" (an event - string) and
> sum up tuple field "3" (integer - value 1)
>                 .*groupBy(1)*
>                 .*sum(3*);
>
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class SelectDataFlatMap extends
>     JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>> {
>
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void flatMap(String value, Collector<Tuple4<Integer,
> String, String, Integer>> record)
>                 throws Exception {
>             try {
>                 record.collect(new Tuple4<Integer, String, String,
> Integer>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event"),
>                         1));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
>
> [/CODE]
>
> However, this time the issue was different.
> The programme executed correctly till status FINISHED.
> However, there was no output :-((
> i.e. For each Task Manager, an empty file is written.
>
> When we checked further about the input text file that is read using
> env.readTextFile() we find that instead of a text string (full text
> dataset) only a small string is written!
> Something as :
> org.apache.flink.api.java.operators.DataSource@6bd8b476
>
> Worse still ! this string value sometimes remains the same over multiple
> runs of the programme ....
> Is this natural ? Is this just the handle to the file or the dataset ?
> Is the Collector() working correctly also ?
>
>
> Note :
> The actual JSON file (i.e. the text file that should be read) is of the
> following nature, with a 2-level hierarchy for one field:
> [JSON]
> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
> Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>  type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
> Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>  type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
> So now again, we are confused if we are doing it correctly :-((
>
> Thanks in advance for helping us to understand where we are going wrong.
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <s....@gmail.com>
> *To: *"user" <us...@flink.incubator.apache.org>
> *Cc: *dev@flink.incubator.apache.org
> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
> *Subject: *Re: Program crashes trying to read JSON file
>
>
> Very quickly, it seems you are trying to sum on Strings
>
> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.
>
> Check your tuple types and be sure that you are not summing on strings.
>
>
> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:
>
>> Hello all,
>>
>> We are using Flink 0.7 and trying to read a large JSON file, reading some
>> fields into a flink  (3-tuple based) dataset, then performing some
>> operations.
>>
>> We encountered the following runtime error:
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>     at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>>     at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>     at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>     at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>     ... 6 more
>> [/QUOTE]
>>
>>
>>
>> The code snippet that could have caused this error (i.e. that we edited)
>> is the following
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         if(!parseParameters(args)) {
>>             return;
>>         }
>>
>>         // set up the execution environment
>>         final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>>         // get input data
>>         DataSet<String> text = getTextDataSet(env);
>>
>>         DataSet<Tuple3<Integer, String, String>> counts =
>>                 // split up the lines in pairs (3-tuples) containing:
>> (timestamp,uuid,event)
>>                 text.flatMap(new *SelectDataFlatMap()*)
>>                 // group by the tuple field "0" and sum up tuple field "1"
>>                 .groupBy(2)
>>                 .sum(2);
>>
>>         // emit result
>>         if(fileOutput) {
>>             counts.writeAsCsv(outputPath, "\n", " ");
>>         } else {
>>             counts.print();
>>         }
>>
>>         // execute program
>>         env.execute("Weblogs Programme");
>>     }
>>
>> ...
>>
>>     public static class *SelectDataFlatMap* extends
>>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>
>>         @Override
>>         public void flatMap(String value, Collector<Tuple3<Integer,
>> String, String>> out)
>>                 throws Exception {
>>             try {
>>                 out.collect(new Tuple3<Integer, String, String>(
>>                         getInt(value, "timestamp"),
>>                         getString(value, "uuid"),
>>                         getString(value, "event")));
>>             } catch (JSONException e) {
>>                 System.err.println("Field not found");
>>             }
>>         }
>>     }
>>
>> [/CODE]
>>
>>
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>>     at org.apache.flink.client.program.Client.run(Client.java:244)
>>     at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>>     at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>>     at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>>     at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>>     at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>>     ... 6 more
>> [/QUOTE]
>>
>>
>> The JSON file is of the following nature, with a 2-level hierarchy for
>> one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
>> Younited     uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>>  type: can-usage-v1     event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
>> Younited     uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>>  type: can-usage-v1     event: General,App,Opened}}
>> [/JSON]
>>
>>
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>>
>> Anirvan
>>
>
>
>

Re: Program crashes trying to read JSON file

Posted by Anirvan BASU <an...@inria.fr>.
Thanks to Aljoscha and Stefano for pointing out the flaw. 

We corrected the issue as follows: 

[CODE] 

import org.apache.flink.api.java.tuple. Tuple4 ; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple4<Integer, String, String, Integer >> counts = 
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event, count ) 
text.flatMap(new SelectDataFlatMap()) 
// group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1) 
. groupBy(1) 
. sum(3 ); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { 

private static final long serialVersionUID = 1L; 

@Override 
public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record) 
throws Exception { 
try { 
record.collect(new Tuple4<Integer, String, String, Integer>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"), 
1)); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 

However, this time the issue was different. 
The programme executed correctly till status FINISHED. 
However, there was no output :-(( 
i.e. For each Task Manager, an empty file is written. 

When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written! 
Something as : 
org.apache.flink.api.java.operators.DataSource@6bd8b476 

Worse still ! this string value sometimes remains the same over multiple runs of the programme .... 
Is this natural ? Is this just the handle to the file or the dataset ? 
Is the Collector() working correctly also ? 

Note : 
The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} 
[/JSON] 

So now again, we are confused if we are doing it correctly :-(( 

Thanks in advance for helping us to understand where we are going wrong. 
Anirvan 

----- Original Message -----

> From: "Stefano Bortoli" <s....@gmail.com>
> To: "user" <us...@flink.incubator.apache.org>
> Cc: dev@flink.incubator.apache.org
> Sent: Tuesday, November 25, 2014 5:05:34 PM
> Subject: Re: Program crashes trying to read JSON file

> Very quickly, it seems you are trying to sum on Strings

> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.

> Check your tuple types and be sure that you are not summing on strings.

> 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :

> > Hello all,
> 

> > We are using Flink 0.7 and trying to read a large JSON file, reading some
> > fields into a flink (3-tuple based) dataset, then performing some
> > operations.
> 

> > We encountered the following runtime error:
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The code snippet that could have caused this error (i.e. that we edited) is
> > the following
> 

> > [CODE]
> 

> > import org.apache.flink.api.java.tuple.Tuple3;
> 
> > import org.apache.flink.util.Collector;
> 
> > import org.apache.flink.api.java.DataSet;
> 
> > import org.apache.flink.api.java.ExecutionEnvironment;
> 
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> 
> > import org.apache.sling.commons.json.JSONException;
> 
> > ...
> 

> > public static void main(String[] args) throws Exception {
> 

> > if(!parseParameters(args)) {
> 
> > return;
> 
> > }
> 

> > // set up the execution environment
> 
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> 

> > // get input data
> 
> > DataSet<String> text = getTextDataSet(env);
> 

> > DataSet<Tuple3<Integer, String, String>> counts =
> 
> > // split up the lines in pairs (3-tuples) containing:
> > (timestamp,uuid,event)
> 
> > text.flatMap(new SelectDataFlatMap() )
> 
> > // group by the tuple field "0" and sum up tuple field "1"
> 
> > .groupBy(2)
> 
> > .sum(2);
> 

> > // emit result
> 
> > if(fileOutput) {
> 
> > counts.writeAsCsv(outputPath, "\n", " ");
> 
> > } else {
> 
> > counts.print();
> 
> > }
> 

> > // execute program
> 
> > env.execute("Weblogs Programme");
> 
> > }
> 

> > ...
> 

> > public static class SelectDataFlatMap extends
> 
> > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> 

> > @Override
> 
> > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > String>>
> > out)
> 
> > throws Exception {
> 
> > try {
> 
> > out.collect(new Tuple3<Integer, String, String>(
> 
> > getInt(value, "timestamp"),
> 
> > getString(value, "uuid"),
> 
> > getString(value, "event")));
> 
> > } catch (JSONException e) {
> 
> > System.err.println("Field not found");
> 
> > }
> 
> > }
> 
> > }
> 

> > [/CODE]
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The JSON file is of the following nature, with a 2-level hierarchy for one
> > field:
> 
> > [JSON]
> 
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
> 
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
> 
> > [/JSON]
> 

> > Thanks in advance for helping us to understand where we are going wrong.
> 

> > Anirvan
> 

Re: Program crashes trying to read JSON file

Posted by Anirvan BASU <an...@inria.fr>.
Thanks to Aljoscha and Stefano for pointing out the flaw. 

We corrected the issue as follows: 

[CODE] 

import org.apache.flink.api.java.tuple. Tuple4 ; 
import org.apache.flink.util.Collector; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; 
import org.apache.sling.commons.json.JSONException; 
... 

public static void main(String[] args) throws Exception { 

if(!parseParameters(args)) { 
return; 
} 

// set up the execution environment 
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// get input data 
DataSet<String> text = getTextDataSet(env); 

DataSet<Tuple4<Integer, String, String, Integer >> counts = 
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event, count ) 
text.flatMap(new SelectDataFlatMap()) 
// group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1) 
. groupBy(1) 
. sum(3 ); 

// emit result 
if(fileOutput) { 
counts.writeAsCsv(outputPath, "\n", " "); 
} else { 
counts.print(); 
} 

// execute program 
env.execute("Weblogs Programme"); 
} 

... 

public static class SelectDataFlatMap extends 
JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { 

private static final long serialVersionUID = 1L; 

@Override 
public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record) 
throws Exception { 
try { 
record.collect(new Tuple4<Integer, String, String, Integer>( 
getInt(value, "timestamp"), 
getString(value, "uuid"), 
getString(value, "event"), 
1)); 
} catch (JSONException e) { 
System.err.println("Field not found"); 
} 
} 
} 

[/CODE] 

However, this time the issue was different. 
The programme executed correctly till status FINISHED. 
However, there was no output :-(( 
i.e. For each Task Manager, an empty file is written. 

When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written! 
Something as : 
org.apache.flink.api.java.operators.DataSource@6bd8b476 

Worse still ! this string value sometimes remains the same over multiple runs of the programme .... 
Is this natural ? Is this just the handle to the file or the dataset ? 
Is the Collector() working correctly also ? 

Note : 
The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field: 
[JSON] 
{timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} 
{timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} 
[/JSON] 

So now again, we are confused if we are doing it correctly :-(( 

Thanks in advance for helping us to understand where we are going wrong. 
Anirvan 

----- Original Message -----

> From: "Stefano Bortoli" <s....@gmail.com>
> To: "user" <us...@flink.incubator.apache.org>
> Cc: dev@flink.incubator.apache.org
> Sent: Tuesday, November 25, 2014 5:05:34 PM
> Subject: Re: Program crashes trying to read JSON file

> Very quickly, it seems you are trying to sum on Strings

> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.

> Check your tuple types and be sure that you are not summing on strings.

> 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.basu@inria.fr > :

> > Hello all,
> 

> > We are using Flink 0.7 and trying to read a large JSON file, reading some
> > fields into a flink (3-tuple based) dataset, then performing some
> > operations.
> 

> > We encountered the following runtime error:
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The code snippet that could have caused this error (i.e. that we edited) is
> > the following
> 

> > [CODE]
> 

> > import org.apache.flink.api.java.tuple.Tuple3;
> 
> > import org.apache.flink.util.Collector;
> 
> > import org.apache.flink.api.java.DataSet;
> 
> > import org.apache.flink.api.java.ExecutionEnvironment;
> 
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> 
> > import org.apache.sling.commons.json.JSONException;
> 
> > ...
> 

> > public static void main(String[] args) throws Exception {
> 

> > if(!parseParameters(args)) {
> 
> > return;
> 
> > }
> 

> > // set up the execution environment
> 
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> 

> > // get input data
> 
> > DataSet<String> text = getTextDataSet(env);
> 

> > DataSet<Tuple3<Integer, String, String>> counts =
> 
> > // split up the lines in pairs (3-tuples) containing:
> > (timestamp,uuid,event)
> 
> > text.flatMap(new SelectDataFlatMap() )
> 
> > // group by the tuple field "0" and sum up tuple field "1"
> 
> > .groupBy(2)
> 
> > .sum(2);
> 

> > // emit result
> 
> > if(fileOutput) {
> 
> > counts.writeAsCsv(outputPath, "\n", " ");
> 
> > } else {
> 
> > counts.print();
> 
> > }
> 

> > // execute program
> 
> > env.execute("Weblogs Programme");
> 
> > }
> 

> > ...
> 

> > public static class SelectDataFlatMap extends
> 
> > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
> 

> > @Override
> 
> > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > String>>
> > out)
> 
> > throws Exception {
> 
> > try {
> 
> > out.collect(new Tuple3<Integer, String, String>(
> 
> > getInt(value, "timestamp"),
> 
> > getString(value, "uuid"),
> 
> > getString(value, "event")));
> 
> > } catch (JSONException e) {
> 
> > System.err.println("Field not found");
> 
> > }
> 
> > }
> 
> > }
> 

> > [/CODE]
> 

> > [QUOTE]
> 
> > Error: The main method caused an error.
> 
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
> 
> > at org.apache.flink.client.program.Client.run(Client.java:244)
> 
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
> 
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
> 
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
> 
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> 
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
> 
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
> 
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
> 
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
> 
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
> 
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
> > at java.lang.reflect.Method.invoke(Method.java:606)
> 
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
> 
> > ... 6 more
> 
> > [/QUOTE]
> 

> > The JSON file is of the following nature, with a 2-level hierarchy for one
> > field:
> 
> > [JSON]
> 
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
> 
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
> 
> > [/JSON]
> 

> > Thanks in advance for helping us to understand where we are going wrong.
> 

> > Anirvan
> 

Re: Program crashes trying to read JSON file

Posted by Stefano Bortoli <s....@gmail.com>.
Very quickly, it seems you are trying to sum on Strings

Caused by: org.apache.flink.api.java.
aggregation.UnsupportedAggregationTypeException: The type java.lang.String
has currently not supported for built-in sum aggregations.

Check your tuple types and be sure that you are not summing on strings.


2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:

> Hello all,
>
> We are using Flink 0.7 and trying to read a large JSON file, reading some
> fields into a flink  (3-tuple based) dataset, then performing some
> operations.
>
> We encountered the following runtime error:
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
>
> The code snippet that could have caused this error (i.e. that we edited)
> is the following
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple3<Integer, String, String>> counts =
>                 // split up the lines in pairs (3-tuples) containing:
> (timestamp,uuid,event)
>                 text.flatMap(new *SelectDataFlatMap()*)
>                 // group by the tuple field "0" and sum up tuple field "1"
>                 .groupBy(2)
>                 .sum(2);
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class *SelectDataFlatMap* extends
>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>
>         @Override
>         public void flatMap(String value, Collector<Tuple3<Integer,
> String, String>> out)
>                 throws Exception {
>             try {
>                 out.collect(new Tuple3<Integer, String, String>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event")));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
> [/CODE]
>
>
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
> The JSON file is of the following nature, with a 2-level hierarchy for one
> field:
> [JSON]
> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
> Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>  type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
> Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>  type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
>
> Thanks in advance for helping us to understand where we are going wrong.
>
> Anirvan
>

Re: Program crashes trying to read JSON file

Posted by Stefano Bortoli <s....@gmail.com>.
Very quickly, it seems you are trying to sum on Strings

Caused by: org.apache.flink.api.java.
aggregation.UnsupportedAggregationTypeException: The type java.lang.String
has currently not supported for built-in sum aggregations.

Check your tuple types and be sure that you are not summing on strings.


2014-11-25 16:55 GMT+01:00 Anirvan BASU <an...@inria.fr>:

> Hello all,
>
> We are using Flink 0.7 and trying to read a large JSON file, reading some
> fields into a flink  (3-tuple based) dataset, then performing some
> operations.
>
> We encountered the following runtime error:
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
>
> The code snippet that could have caused this error (i.e. that we edited)
> is the following
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple3<Integer, String, String>> counts =
>                 // split up the lines in pairs (3-tuples) containing:
> (timestamp,uuid,event)
>                 text.flatMap(new *SelectDataFlatMap()*)
>                 // group by the tuple field "0" and sum up tuple field "1"
>                 .groupBy(2)
>                 .sum(2);
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class *SelectDataFlatMap* extends
>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>
>         @Override
>         public void flatMap(String value, Collector<Tuple3<Integer,
> String, String>> out)
>                 throws Exception {
>             try {
>                 out.collect(new Tuple3<Integer, String, String>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event")));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
> [/CODE]
>
>
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
> The JSON file is of the following nature, with a 2-level hierarchy for one
> field:
> [JSON]
> {timestamp: 1397731764 <callto:1397731764>     payload: {product:
> Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>  platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>  type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765 <callto:1397731765>     payload: {product:
> Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>  platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>  type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
>
> Thanks in advance for helping us to understand where we are going wrong.
>
> Anirvan
>

Re: Program crashes trying to read JSON file

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the problems seems to be originating from this statement:
text.flatMap(new SelectDataFlatMap())
   // group by the tuple field "0" and sum up tuple field "1"
   .groupBy(2)
   .sum(2);

The indices in these operations are 0-based. What do you expect the
result of this operation might be? The aggregate operations only work
on number data types right now.

Regards,
Aljoscha

On Tue, Nov 25, 2014 at 4:55 PM, Anirvan BASU <an...@inria.fr> wrote:
> Hello all,
>
> We are using Flink 0.7 and trying to read a large JSON file, reading some
> fields into a flink  (3-tuple based) dataset, then performing some
> operations.
>
> We encountered the following runtime error:
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
>
> The code snippet that could have caused this error (i.e. that we edited) is
> the following
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
>     public static void main(String[] args) throws Exception {
>
>         if(!parseParameters(args)) {
>             return;
>         }
>
>         // set up the execution environment
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>         // get input data
>         DataSet<String> text = getTextDataSet(env);
>
>         DataSet<Tuple3<Integer, String, String>> counts =
>                 // split up the lines in pairs (3-tuples) containing:
> (timestamp,uuid,event)
>                 text.flatMap(new SelectDataFlatMap())
>                 // group by the tuple field "0" and sum up tuple field "1"
>                 .groupBy(2)
>                 .sum(2);
>
>         // emit result
>         if(fileOutput) {
>             counts.writeAsCsv(outputPath, "\n", " ");
>         } else {
>             counts.print();
>         }
>
>         // execute program
>         env.execute("Weblogs Programme");
>     }
>
> ...
>
>     public static class SelectDataFlatMap extends
>     JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>
>         @Override
>         public void flatMap(String value, Collector<Tuple3<Integer, String,
> String>> out)
>                 throws Exception {
>             try {
>                 out.collect(new Tuple3<Integer, String, String>(
>                         getInt(value, "timestamp"),
>                         getString(value, "uuid"),
>                         getString(value, "event")));
>             } catch (JSONException e) {
>                 System.err.println("Field not found");
>             }
>         }
>     }
>
> [/CODE]
>
>
>
> [QUOTE]
> Error: The main method caused an error.
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>     at org.apache.flink.client.program.Client.run(Client.java:244)
>     at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>     at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
> Caused by:
> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> The type java.lang.String has currently not supported for built-in sum
> aggregations.
>     at
> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>     at
> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>     at
> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>     at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>     ... 6 more
> [/QUOTE]
>
>
> The JSON file is of the following nature, with a 2-level hierarchy for one
> field:
> [JSON]
> {timestamp: 1397731764     payload: {product: Younited     uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
> type: can-usage-v1     event: General,Login,Success}}
> {timestamp: 1397731765     payload: {product: Younited     uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e
> type: can-usage-v1     event: General,App,Opened}}
> [/JSON]
>
>
>
> Thanks in advance for helping us to understand where we are going wrong.
>
> Anirvan