You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chrisr123 <ch...@gmail.com> on 2018/05/27 16:55:49 UTC
Writing Table API results to a csv file
I'm using Flink 1.4.0
I'm trying to save the results of a Table API query to a CSV file, but I'm
getting an error.
Here are the details:
My Input file looks like this:
id,species,color,weight,name
311,canine,golden,75,dog1
312,canine,brown,22,dog2
313,feline,gray,8,cat1
I run a query on this to select canines only, and I want to save this to a
csv file:
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
String inputPath = "location-of-source-file";
CsvTableSource petsTableSource = CsvTableSource.builder()
.path(inputPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("species", Types.STRING())
.field("color", Types.STRING())
.field("weight", Types.DOUBLE())
.field("name", Types.STRING())
.build();
// Register our table source
tableEnv.registerTableSource("pets", petsTableSource);
Table pets = tableEnv.scan("pets");
Table counts = pets
.groupBy("species")
.select("species, species.count as count")
.filter("species === 'canine'");
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();
// Write Results to File
TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
counts.writeToSink(sink);
When I run this, I get the output from the result.print() call as this:
canine,2
but I do not see any results written
to the file, and I see the error below.
How can I save the results I'm seeing in stdout to a CSV file?
Thanks!
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Writing Table API results to a csv file
Posted by Chris Ruegger <ch...@gmail.com>.
Fabian, Jorn:
Yes, that was indeed it.
When I added the env.execute("MyApp") it worked.
Thank you for your help.
-Chris
On Mon, May 28, 2018 at 5:03 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi,
>
> Jörn is probably right.
>
> In contrast to print(), which immediately triggers an execution,
> writeToSink() just appends a sink operator and requires to explicitly
> trigger the execution.
>
> The INFO messages of the TypeExtractor are "just" telling you, that Row
> cannot be used as a POJO type, but that's fine here.
>
> Best, Fabian
>
> 2018-05-27 19:51 GMT+02:00 Jörn Franke <jo...@gmail.com>:
>
>> Do you have the complete source?
>>
>> I am missing a env.execute at the end
>>
>> > On 27. May 2018, at 18:55, chrisr123 <ch...@gmail.com> wrote:
>> >
>> > I'm using Flink 1.4.0
>> >
>> > I'm trying to save the results of a Table API query to a CSV file, but
>> I'm
>> > getting an error.
>> > Here are the details:
>> >
>> > My Input file looks like this:
>> > id,species,color,weight,name
>> > 311,canine,golden,75,dog1
>> > 312,canine,brown,22,dog2
>> > 313,feline,gray,8,cat1
>> >
>> > I run a query on this to select canines only, and I want to save this
>> to a
>> > csv file:
>> >
>> > ExecutionEnvironment env =
>> > ExecutionEnvironment.getExecutionEnvironment();
>> > BatchTableEnvironment tableEnv =
>> > TableEnvironment.getTableEnvironment(env);
>> >
>> > String inputPath = "location-of-source-file";
>> > CsvTableSource petsTableSource = CsvTableSource.builder()
>> > .path(inputPath)
>> > .ignoreFirstLine()
>> > .fieldDelimiter(",")
>> > .field("id", Types.INT())
>> > .field("species", Types.STRING())
>> > .field("color", Types.STRING())
>> > .field("weight", Types.DOUBLE())
>> > .field("name", Types.STRING())
>> > .build();
>> >
>> > // Register our table source
>> > tableEnv.registerTableSource("pets", petsTableSource);
>> > Table pets = tableEnv.scan("pets");
>> >
>> > Table counts = pets
>> > .groupBy("species")
>> > .select("species, species.count as count")
>> > .filter("species === 'canine'");
>> >
>> > DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
>> > result.print();
>> >
>> > // Write Results to File
>> > TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets",
>> ",");
>> > counts.writeToSink(sink);
>> >
>> > When I run this, I get the output from the result.print() call as this:
>> >
>> > canine,2
>> >
>> > but I do not see any results written
>> > to the file, and I see the error below.
>> > How can I save the results I'm seeing in stdout to a CSV file?
>> > Thanks!
>> >
>> >
>> >
>> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
>> > contain a getter for field fields
>> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
>> > contain a setter for field fields
>> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
>> valid
>> > POJO type because not all fields are valid POJO fields.
>> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
>> > contain a getter for field fields
>> > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
>> > contain a setter for field fields
>> > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
>> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
>> valid
>> > POJO type because not all fields are valid POJO fields.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
--
----------------------------------------------------------------------------------------------------------------------------------------
Simplicity is the ultimate sophistication
--Leonardo DaVinci
www.rueggerconsultingllc.com
Re: Writing Table API results to a csv file
Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
Jörn is probably right.
In contrast to print(), which immediately triggers an execution,
writeToSink() just appends a sink operator and requires to explicitly
trigger the execution.
The INFO messages of the TypeExtractor are "just" telling you, that Row
cannot be used as a POJO type, but that's fine here.
Best, Fabian
2018-05-27 19:51 GMT+02:00 Jörn Franke <jo...@gmail.com>:
> Do you have the complete source?
>
> I am missing a env.execute at the end
>
> > On 27. May 2018, at 18:55, chrisr123 <ch...@gmail.com> wrote:
> >
> > I'm using Flink 1.4.0
> >
> > I'm trying to save the results of a Table API query to a CSV file, but
> I'm
> > getting an error.
> > Here are the details:
> >
> > My Input file looks like this:
> > id,species,color,weight,name
> > 311,canine,golden,75,dog1
> > 312,canine,brown,22,dog2
> > 313,feline,gray,8,cat1
> >
> > I run a query on this to select canines only, and I want to save this to
> a
> > csv file:
> >
> > ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> > BatchTableEnvironment tableEnv =
> > TableEnvironment.getTableEnvironment(env);
> >
> > String inputPath = "location-of-source-file";
> > CsvTableSource petsTableSource = CsvTableSource.builder()
> > .path(inputPath)
> > .ignoreFirstLine()
> > .fieldDelimiter(",")
> > .field("id", Types.INT())
> > .field("species", Types.STRING())
> > .field("color", Types.STRING())
> > .field("weight", Types.DOUBLE())
> > .field("name", Types.STRING())
> > .build();
> >
> > // Register our table source
> > tableEnv.registerTableSource("pets", petsTableSource);
> > Table pets = tableEnv.scan("pets");
> >
> > Table counts = pets
> > .groupBy("species")
> > .select("species, species.count as count")
> > .filter("species === 'canine'");
> >
> > DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
> > result.print();
> >
> > // Write Results to File
> > TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets",
> ",");
> > counts.writeToSink(sink);
> >
> > When I run this, I get the output from the result.print() call as this:
> >
> > canine,2
> >
> > but I do not see any results written
> > to the file, and I see the error below.
> > How can I save the results I'm seeing in stdout to a CSV file?
> > Thanks!
> >
> >
> >
> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> > contain a getter for field fields
> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> > contain a setter for field fields
> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
> valid
> > POJO type because not all fields are valid POJO fields.
> > 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> > contain a getter for field fields
> > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> > contain a setter for field fields
> > 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
> > (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a
> valid
> > POJO type because not all fields are valid POJO fields.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
Re: Writing Table API results to a csv file
Posted by Jörn Franke <jo...@gmail.com>.
Do you have the complete source?
I am missing a env.execute at the end
> On 27. May 2018, at 18:55, chrisr123 <ch...@gmail.com> wrote:
>
> I'm using Flink 1.4.0
>
> I'm trying to save the results of a Table API query to a CSV file, but I'm
> getting an error.
> Here are the details:
>
> My Input file looks like this:
> id,species,color,weight,name
> 311,canine,golden,75,dog1
> 312,canine,brown,22,dog2
> 313,feline,gray,8,cat1
>
> I run a query on this to select canines only, and I want to save this to a
> csv file:
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
> String inputPath = "location-of-source-file";
> CsvTableSource petsTableSource = CsvTableSource.builder()
> .path(inputPath)
> .ignoreFirstLine()
> .fieldDelimiter(",")
> .field("id", Types.INT())
> .field("species", Types.STRING())
> .field("color", Types.STRING())
> .field("weight", Types.DOUBLE())
> .field("name", Types.STRING())
> .build();
>
> // Register our table source
> tableEnv.registerTableSource("pets", petsTableSource);
> Table pets = tableEnv.scan("pets");
>
> Table counts = pets
> .groupBy("species")
> .select("species, species.count as count")
> .filter("species === 'canine'");
>
> DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
> result.print();
>
> // Write Results to File
> TableSink<Row> sink = new CsvTableSink("/home/hadoop/output/pets", ",");
> counts.writeToSink(sink);
>
> When I run this, I get the output from the result.print() call as this:
>
> canine,2
>
> but I do not see any results written
> to the file, and I see the error below.
> How can I save the results I'm seeing in stdout to a CSV file?
> Thanks!
>
>
>
> 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
> 2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
> contain a getter for field fields
> 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
> contain a setter for field fields
> 2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
> (TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
> POJO type because not all fields are valid POJO fields.
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/