You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/10/29 06:56:00 UTC

[jira] [Resolved] (SPARK-25723) spark sql External DataSource question

     [ https://issues.apache.org/jira/browse/SPARK-25723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-25723.
----------------------------------
    Resolution: Invalid

> spark sql External DataSource question
> --------------------------------------
>
>                 Key: SPARK-25723
>                 URL: https://issues.apache.org/jira/browse/SPARK-25723
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 2.3.2
>         Environment: local mode
>            Reporter: huanghuai
>            Priority: Minor
>         Attachments: QQ图片20181015182502.jpg
>
>
> {code:java}
> public class MyDatasourceRelation extends BaseRelation implements PrunedFilteredScan {
> Map<String, String> parameters;
> SparkSession sparkSession;
> CombinedReportHelper helper;
> public MyDatasourceRelation() {
> }
> public MyDatasourceRelation (SQLContext sqlContext,Map<String, String> parameters) {
> this.parameters = parameters;
> this.sparkSession = sqlContext.sparkSession();
> this.helper = new CombinedReportHelper(parameters); //don't care 
> this.helper.setRowsPerPage(1);
> }
> @Override
> public SQLContext sqlContext() {
> return this.sparkSession.sqlContext();
> }
> @Override
> public StructType schema() {
> StructType structType = transformSchema(helper.getFields(), helper.getFirst());
> //helper.close();
> System.out.println("get schema: "+structType);
> return structType;
> }
> @Override
> public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
> System.out.println("build scan:");
> int totalRow = helper.getTotalRow();
> Partition[] partitions = getPartitions(totalRow, parameters);
> System.out.println("get partition:"+partitions.length+" total row:"+totalRow);
> return new SmartbixDatasourceRDD(sparkSession.sparkContext(), partitions, parameters);
> }
> private Partition[] getPartitions(int totalRow, Map<String, String> parameters) {
> int step = 1000000;
> int numOfPartitions = (totalRow + step - 1) / step;
> Partition[] partitions = new Partition[numOfPartitions];
> for (int i = 0; i < numOfPartitions; i++) {
> int start = i * step + 1;
> partitions[i] = new MyPartition(null, i, start, start + step);
> }
> return partitions;
> }
> }
> {code}
>  
>  
>  
> ---------- above is my code,some useless information are removed -----------------------
>  
>  
> trait PrunedFilteredScan
> { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] }
>  
> if i implement this trait, i find requiredColumns param is different everytime,Why are the order different????
> {color:#ff0000}you can use spark.read.jdbc  and connect to your local mysql DB, and debug at {color}
> {color:#ff0000}org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#buildScan(scala:130){color}
> to show this param;
> attachement is my screenshot 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org