You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/10/29 06:56:00 UTC
[jira] [Commented] (SPARK-25723) spark sql External DataSource
question
[ https://issues.apache.org/jira/browse/SPARK-25723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16666802#comment-16666802 ]
Hyukjin Kwon commented on SPARK-25723:
--------------------------------------
[~zhiyin1233], Please provide more information and reopen.
> spark sql External DataSource question
> --------------------------------------
>
> Key: SPARK-25723
> URL: https://issues.apache.org/jira/browse/SPARK-25723
> Project: Spark
> Issue Type: Question
> Components: SQL
> Affects Versions: 2.3.2
> Environment: local mode
> Reporter: huanghuai
> Priority: Minor
> Attachments: QQ图片20181015182502.jpg
>
>
> {code:java}
> public class MyDatasourceRelation extends BaseRelation implements PrunedFilteredScan {
> Map<String, String> parameters;
> SparkSession sparkSession;
> CombinedReportHelper helper;
> public MyDatasourceRelation() {
> }
> public MyDatasourceRelation (SQLContext sqlContext,Map<String, String> parameters) {
> this.parameters = parameters;
> this.sparkSession = sqlContext.sparkSession();
> this.helper = new CombinedReportHelper(parameters); //don't care
> this.helper.setRowsPerPage(1);
> }
> @Override
> public SQLContext sqlContext() {
> return this.sparkSession.sqlContext();
> }
> @Override
> public StructType schema() {
> StructType structType = transformSchema(helper.getFields(), helper.getFirst());
> //helper.close();
> System.out.println("get schema: "+structType);
> return structType;
> }
> @Override
> public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
> System.out.println("build scan:");
> int totalRow = helper.getTotalRow();
> Partition[] partitions = getPartitions(totalRow, parameters);
> System.out.println("get partition:"+partitions.length+" total row:"+totalRow);
> return new SmartbixDatasourceRDD(sparkSession.sparkContext(), partitions, parameters);
> }
> private Partition[] getPartitions(int totalRow, Map<String, String> parameters) {
> int step = 1000000;
> int numOfPartitions = (totalRow + step - 1) / step;
> Partition[] partitions = new Partition[numOfPartitions];
> for (int i = 0; i < numOfPartitions; i++) {
> int start = i * step + 1;
> partitions[i] = new MyPartition(null, i, start, start + step);
> }
> return partitions;
> }
> }
> {code}
>
>
>
> ---------- above is my code,some useless information are removed -----------------------
>
>
> trait PrunedFilteredScan
> { def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] }
>
> if i implement this trait, i find requiredColumns param is different everytime,Why are the order different????
> {color:#ff0000}you can use spark.read.jdbc and connect to your local mysql DB, and debug at {color}
> {color:#ff0000}org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation#buildScan(scala:130){color}
> to show this param;
> attachement is my screenshot
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org