You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by po...@gmx.com on 2022/07/05 21:09:54 UTC

How can I convert a DataSet into a Table?


My code is:



_package flinkTest2;_



_import org.apache.flink.api.java.DataSet;  
import org.apache.flink.api.java.ExecutionEnvironment;  
import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.table.api.EnvironmentSettings;  
import org.apache.flink.table.api.Table;  
import org.apache.flink.table.api.TableEnvironment;  
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;_



_public class flinkTest2 {  
    public static void main(String[] args) throws Exception {_

_    _

_    ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();  
      
    // read a CSV file with five fields, taking only two of them  
    DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("c:/CSV/file")  
                               .includeFields("10010")  // take the first and the fourth field  
                               .types(String.class, Double.class);  
      
    //register and create table  
    EnvironmentSettings settings = EnvironmentSettings  
            .newInstance()  
            //.inStreamingMode()  
            .inBatchMode()  
            .build();_

_     TableEnvironment tEnv = TableEnvironment.create(settings);_





_    //Insert CSV content into table, define column names and read some rows
from it  
    _

_  }  
}_



What to do create table, insert DataSet csvInput into table and read some rows
from it (into text file)?

Thanks for help



Mike


Re: How can I convert a DataSet into a Table?

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
I'm afraid we have no way to do such conversion in Flink 1.15. 

But for you case, which is to read from csv file in table api. You can try as follows: 

tableEnv.createTemporaryTable(" csvInput ", TableDescriptor.forConnector("filesystem") 
.schema(schema) 
.option("path", "/path/to/file") 
.format(FormatDescriptor.forFormat("csv") 
.option("field-delimiter", "|") 
.build()) 
.build()) 

Table table1 = tEnv.from(" csvInput ").xxx 

See more in the Flink doc[1] 
[1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api ] 


Best regards, 
Yuxia 


发件人: "podunk" <po...@gmx.com> 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 
主题: How can I convert a DataSet into a Table? 

My code is: 
package flinkTest2; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
public class flinkTest2 { 
public static void main(String[] args) throws Exception { 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// read a CSV file with five fields, taking only two of them 
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("c:/CSV/file") 
.includeFields("10010") // take the first and the fourth field 
.types(String.class, Double.class); 

//register and create table 
EnvironmentSettings settings = EnvironmentSettings 
.newInstance() 
//.inStreamingMode() 
.inBatchMode() 
.build(); 
TableEnvironment tEnv = TableEnvironment.create(settings); 
//Insert CSV content into table, define column names and read some rows from it 
} 
} 
What to do create table, insert DataSet csvInput into table and read some rows from it (into text file)? 
Thanks for help 
Mike 


Re: How can I convert a DataSet into a Table?

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
What's the version of Flink you are using? 
In Flink 1.13, you can use BatchTableEnvironment#fromDataSet() to do that. But since Flink 1.14, the method has been removed. 

[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.html#fromDataSet-org.apache.flink.api.java.DataSet-java.lang.String- 

Best regards, 
Yuxia 


发件人: podunk@gmx.com 
收件人: "User" <us...@flink.apache.org> 
发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 
主题: How can I convert a DataSet into a Table? 

My code is: 
package flinkTest2; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
public class flinkTest2 { 
public static void main(String[] args) throws Exception { 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// read a CSV file with five fields, taking only two of them 
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("c:/CSV/file") 
.includeFields("10010") // take the first and the fourth field 
.types(String.class, Double.class); 

//register and create table 
EnvironmentSettings settings = EnvironmentSettings 
.newInstance() 
//.inStreamingMode() 
.inBatchMode() 
.build(); 
TableEnvironment tEnv = TableEnvironment.create(settings); 
//Insert CSV content into table, define column names and read some rows from it 
} 
} 
What to do create table, insert DataSet csvInput into table and read some rows from it (into text file)? 
Thanks for help 
Mike