You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by "herman.yu@teeupdata.com" <he...@teeupdata.com> on 2016/11/15 15:45:57 UTC

Spark with Calcite JDBC and Druid adapter

Hi everyone,

When accessing Druid through Calcite JDBC and Druid adapter from Spark, I have been experiencing strange results. 

Druid data schema is defined as:

    {
      "type": "custom",
      "name": “xxx",
      "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
      "operand": {
        "url": "http://<host>:8082",
        "coordinatorUrl": "http://<host>:8081"
      },
      "tables": [
        {
          "name": “yyy",
          "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
          "operand": {
            "dataSource": “<DruidDataSource>t",
            "interval": "2016-11-08T00:00:00.000Z/2016-12-31T00:00:00.000Z",
            "timestampColumn": "timestamp",
            "dimensions": [
              "commentorId", 
              "hashTagId",
              “value"
            ],
          "metrics": [
				{
				  "type" : "count",
				  "name" : "count"
				}
		   ]
          }
        }
      ]

with a  JDBC client, queries like these work fine:
1. select * from yay where hashTagId=“hashTag_01”
2. select badcount.hashTagId, badcount.bad,totalcount.total, badcount.bad/totalcount.total*100 as bad_pct 
	from 
    		(select hashTagId, cast(count(*) as double) as bad from yyy where value='bad' group by hashTagId) as badcount
		    join 
				(select hashTagId, cast(count(*) as double) as total from yyy group by hashTagId) as totalcount
			    on 
					(badcount.hashTagId=totalcount.hashTagId)

However, in spark 2.0, it is strange
1. df_yyy = spark.read.format(“jdbc”).option(“url”,jdbc:calcite:model=<path to schema json>;caseSensitive=false”)…
2. df_yyy.show() —— works fine, returns all records
3. df_yyy.filter($”hashTagId”=“hashTag_01”).count() — returns the correct number of records
4. df_yyy.filter($”hashTagId”=“hashTag_01”).show() — returns empty result set
5. df_yyy.join(<another mysql based data frame>, <condition>).show() —— returns empty result set (any joins returns empty result set)

I am suspecting there are conflicts between  how spark parses SQL and how Calcite JDBC does. are there special properties to set as of JDBC string to make it work with Spark? Is there a Calcite JDBC log file that I can dig through? I did some googling and don’t see similar usage with spark/calcite/druid, is this the right way accessing druid from spark? (may be this is a question better for spark/druid community…)

Thanks.
Herman.


Re: Spark with Calcite JDBC and Druid adapter

Posted by Julian Hyde <jh...@apache.org>.
I see that in one place hashTagId is quoted, and in another it is not. So, whoever is generating the SQL (Spark?) is not being consistent, which is a worry.

In the default lexical convention, unquoted columns are converted to upper case. But your column is mixed case. So you need to fix Spark to generate appropriate quoting, or use lex=JAVA in Calcite so that unquoted columns stay the same case.

I’m glad to see that “value” and “count” are quoted. They are SQL reserved words, so they have to be. But if you could change the column names to something else it will make your life easier.

Julian

> On Nov 15, 2016, at 10:33 AM, herman.yu@teeupdata.com wrote:
> 
> Played around Calcite JDBC settings, especially with lexical, some settings returns empty set (default with caseSensitive=false) when where is filter/join, some just failed at parsing phase (e.g. lex=JAVA) :
> 
> ava.sql.SQLException: Error while preparing statement [SELECT "timestamp","commentorId","hashTagId”,"value","count" FROM yyy WHERE (hashTagId IS NOT NULL) AND (hashTagId = 'hashTag_01')]
> 	at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
> 	at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
> 	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:204)
> 	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:186)
> 	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:87)
> 	at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:264)
> 
> when with default setting, after removing "(hashTagId IS NOT NULL) AND" from where clause, correct result set returned. so it does seem to me that this is calcite configuration issue. Does anybody have any experience using Calcite JDBC with Spark?
> 
> thanks
> Herman.
> 
> 
>> On Nov 15, 2016, at 10:45, herman.yu@teeupdata.com wrote:
>> 
>> Hi everyone,
>> 
>> When accessing Druid through Calcite JDBC and Druid adapter from Spark, I have been experiencing strange results. 
>> 
>> Druid data schema is defined as:
>> 
>>   {
>>     "type": "custom",
>>     "name": “xxx",
>>     "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
>>     "operand": {
>>       "url": "http://<host>:8082",
>>       "coordinatorUrl": "http://<host>:8081"
>>     },
>>     "tables": [
>>       {
>>         "name": “yyy",
>>         "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
>>         "operand": {
>>           "dataSource": “<DruidDataSource>t",
>>           "interval": "2016-11-08T00:00:00.000Z/2016-12-31T00:00:00.000Z",
>>           "timestampColumn": "timestamp",
>>           "dimensions": [
>>             "commentorId", 
>>             "hashTagId",
>>             “value"
>>           ],
>>         "metrics": [
>> 				{
>> 				  "type" : "count",
>> 				  "name" : "count"
>> 				}
>> 		   ]
>>         }
>>       }
>>     ]
>> 
>> with a  JDBC client, queries like these work fine:
>> 1. select * from yay where hashTagId=“hashTag_01”
>> 2. select badcount.hashTagId, badcount.bad,totalcount.total, badcount.bad/totalcount.total*100 as bad_pct 
>> 	from 
>>   		(select hashTagId, cast(count(*) as double) as bad from yyy where value='bad' group by hashTagId) as badcount
>> 		    join 
>> 				(select hashTagId, cast(count(*) as double) as total from yyy group by hashTagId) as totalcount
>> 			    on 
>> 					(badcount.hashTagId=totalcount.hashTagId)
>> 
>> However, in spark 2.0, it is strange
>> 1. df_yyy = spark.read.format(“jdbc”).option(“url”,jdbc:calcite:model=<path to schema json>;caseSensitive=false”)…
>> 2. df_yyy.show() —— works fine, returns all records
>> 3. df_yyy.filter($”hashTagId”=“hashTag_01”).count() — returns the correct number of records
>> 4. df_yyy.filter($”hashTagId”=“hashTag_01”).show() — returns empty result set
>> 5. df_yyy.join(<another mysql based data frame>, <condition>).show() —— returns empty result set (any joins returns empty result set)
>> 
>> I am suspecting there are conflicts between  how spark parses SQL and how Calcite JDBC does. are there special properties to set as of JDBC string to make it work with Spark? Is there a Calcite JDBC log file that I can dig through? I did some googling and don’t see similar usage with spark/calcite/druid, is this the right way accessing druid from spark? (may be this is a question better for spark/druid community…)
>> 
>> Thanks.
>> Herman.
>> 
> 


Re: Spark with Calcite JDBC and Druid adapter

Posted by "herman.yu@teeupdata.com" <he...@teeupdata.com>.
Played around Calcite JDBC settings, especially with lexical, some settings returns empty set (default with caseSensitive=false) when where is filter/join, some just failed at parsing phase (e.g. lex=JAVA) :

ava.sql.SQLException: Error while preparing statement [SELECT "timestamp","commentorId","hashTagId”,"value","count" FROM yyy WHERE (hashTagId IS NOT NULL) AND (hashTagId = 'hashTag_01')]
	at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
	at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:204)
	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:186)
	at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:87)
	at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:264)

when with default setting, after removing "(hashTagId IS NOT NULL) AND" from where clause, correct result set returned. so it does seem to me that this is calcite configuration issue. Does anybody have any experience using Calcite JDBC with Spark?

thanks
Herman.


> On Nov 15, 2016, at 10:45, herman.yu@teeupdata.com wrote:
> 
> Hi everyone,
> 
> When accessing Druid through Calcite JDBC and Druid adapter from Spark, I have been experiencing strange results. 
> 
> Druid data schema is defined as:
> 
>    {
>      "type": "custom",
>      "name": “xxx",
>      "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
>      "operand": {
>        "url": "http://<host>:8082",
>        "coordinatorUrl": "http://<host>:8081"
>      },
>      "tables": [
>        {
>          "name": “yyy",
>          "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
>          "operand": {
>            "dataSource": “<DruidDataSource>t",
>            "interval": "2016-11-08T00:00:00.000Z/2016-12-31T00:00:00.000Z",
>            "timestampColumn": "timestamp",
>            "dimensions": [
>              "commentorId", 
>              "hashTagId",
>              “value"
>            ],
>          "metrics": [
> 				{
> 				  "type" : "count",
> 				  "name" : "count"
> 				}
> 		   ]
>          }
>        }
>      ]
> 
> with a  JDBC client, queries like these work fine:
> 1. select * from yay where hashTagId=“hashTag_01”
> 2. select badcount.hashTagId, badcount.bad,totalcount.total, badcount.bad/totalcount.total*100 as bad_pct 
> 	from 
>    		(select hashTagId, cast(count(*) as double) as bad from yyy where value='bad' group by hashTagId) as badcount
> 		    join 
> 				(select hashTagId, cast(count(*) as double) as total from yyy group by hashTagId) as totalcount
> 			    on 
> 					(badcount.hashTagId=totalcount.hashTagId)
> 
> However, in spark 2.0, it is strange
> 1. df_yyy = spark.read.format(“jdbc”).option(“url”,jdbc:calcite:model=<path to schema json>;caseSensitive=false”)…
> 2. df_yyy.show() —— works fine, returns all records
> 3. df_yyy.filter($”hashTagId”=“hashTag_01”).count() — returns the correct number of records
> 4. df_yyy.filter($”hashTagId”=“hashTag_01”).show() — returns empty result set
> 5. df_yyy.join(<another mysql based data frame>, <condition>).show() —— returns empty result set (any joins returns empty result set)
> 
> I am suspecting there are conflicts between  how spark parses SQL and how Calcite JDBC does. are there special properties to set as of JDBC string to make it work with Spark? Is there a Calcite JDBC log file that I can dig through? I did some googling and don’t see similar usage with spark/calcite/druid, is this the right way accessing druid from spark? (may be this is a question better for spark/druid community…)
> 
> Thanks.
> Herman.
>